You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:27 UTC

[flink] 09/10: [FLINK-25432][runtime] Adds JobManagerRunnerRegistry and integrates it into the Dispatcher

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f798996dad0e41ced3e9e293ee81d0caf225874
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Dec 15 14:54:27 2021 +0100

    [FLINK-25432][runtime] Adds JobManagerRunnerRegistry and integrates it into the Dispatcher
---
 .../DefaultJobManagerRunnerRegistry.java           | 127 ++++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       |  65 +++---
 .../dispatcher/JobManagerRunnerRegistry.java       |  68 ++++++
 .../DefaultJobManagerRunnerRegistryTest.java       | 230 +++++++++++++++++++++
 .../TestingJobManagerRunnerRegistry.java           | 199 ++++++++++++++++++
 .../runtime/jobmaster/TestingJobManagerRunner.java |   6 +-
 6 files changed, 664 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
new file mode 100644
index 0000000..2ba54e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code DefaultJobManagerRunnerRegistry} is the default implementation of the {@link
+ * JobManagerRunnerRegistry} interface. All methods of this class are expected to be called from
+ * within the main thread.
+ */
+public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry {
+
+    @VisibleForTesting final Map<JobID, JobManagerRunner> jobManagerRunners;
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultJobManagerRunnerRegistry(
+            int initialCapacity, ComponentMainThreadExecutor mainThreadExecutor) {
+        Preconditions.checkArgument(initialCapacity > 0);
+        jobManagerRunners = new HashMap<>(initialCapacity);
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    @Override
+    public boolean isRegistered(JobID jobId) {
+        return jobManagerRunners.containsKey(jobId);
+    }
+
+    @Override
+    public void register(JobManagerRunner jobManagerRunner) {
+        mainThreadExecutor.assertRunningInMainThread();
+        Preconditions.checkArgument(
+                !isRegistered(jobManagerRunner.getJobID()),
+                "A job with the ID %s is already registered.",
+                jobManagerRunner.getJobID());
+        this.jobManagerRunners.put(jobManagerRunner.getJobID(), jobManagerRunner);
+    }
+
+    @Override
+    public JobManagerRunner get(JobID jobId) {
+        assertJobRegistered(jobId);
+        return this.jobManagerRunners.get(jobId);
+    }
+
+    @Override
+    public int size() {
+        return this.jobManagerRunners.size();
+    }
+
+    @Override
+    public Set<JobID> getRunningJobIds() {
+        return new HashSet<>(this.jobManagerRunners.keySet());
+    }
+
+    @Override
+    public Collection<JobManagerRunner> getJobManagerRunners() {
+        return new ArrayList<>(this.jobManagerRunners.values());
+    }
+
+    @Override
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor unusedExecutor) {
+        return cleanup(jobId);
+    }
+
+    @Override
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor unusedExecutor) {
+        return cleanup(jobId);
+    }
+
+    private CompletableFuture<Void> cleanup(JobID jobId) {
+        mainThreadExecutor.assertRunningInMainThread();
+        if (isRegistered(jobId)) {
+            try {
+                unregister(jobId).close();
+            } catch (Exception e) {
+                return FutureUtils.completedExceptionally(e);
+            }
+        }
+
+        return FutureUtils.completedVoidFuture();
+    }
+
+    @Override
+    public JobManagerRunner unregister(JobID jobId) {
+        mainThreadExecutor.assertRunningInMainThread();
+        assertJobRegistered(jobId);
+        return this.jobManagerRunners.remove(jobId);
+    }
+
+    private void assertJobRegistered(JobID jobId) {
+        if (!isRegistered(jobId)) {
+            throw new NoSuchElementException(
+                    "There is no running job registered for the job ID " + jobId);
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 6599115..88f9237 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -123,7 +123,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     private final FatalErrorHandler fatalErrorHandler;
 
-    private final Map<JobID, JobManagerRunner> runningJobs;
+    private final JobManagerRunnerRegistry jobManagerRunnerRegistry;
 
     private final Collection<JobGraph> recoveredJobs;
 
@@ -184,7 +184,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                 JobManagerSharedServices.fromConfiguration(
                         configuration, blobServer, fatalErrorHandler);
 
-        runningJobs = new HashMap<>(16);
+        jobManagerRunnerRegistry =
+                new DefaultJobManagerRunnerRegistry(16, this.getMainThreadExecutor());
 
         this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
 
@@ -385,7 +386,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
      * @throws FlinkException if the job scheduling status cannot be retrieved
      */
     private boolean isDuplicateJob(JobID jobId) throws FlinkException {
-        return isInGloballyTerminalState(jobId) || runningJobs.containsKey(jobId);
+        return isInGloballyTerminalState(jobId) || jobManagerRunnerRegistry.isRegistered(jobId);
     }
 
     /**
@@ -458,12 +459,12 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Exception {
-        Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
+        Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID()));
         long initializationTimestamp = System.currentTimeMillis();
         JobManagerRunner jobManagerRunner =
                 createJobManagerRunner(jobGraph, initializationTimestamp);
 
-        runningJobs.put(jobGraph.getJobID(), jobManagerRunner);
+        jobManagerRunnerRegistry.register(jobManagerRunner);
 
         final JobID jobId = jobGraph.getJobID();
 
@@ -473,7 +474,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                         .handleAsync(
                                 (jobManagerRunnerResult, throwable) -> {
                                     Preconditions.checkState(
-                                            runningJobs.get(jobId) == jobManagerRunner,
+                                            jobManagerRunnerRegistry.isRegistered(jobId)
+                                                    && jobManagerRunnerRegistry.get(jobId)
+                                                            == jobManagerRunner,
                                             "The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");
 
                                     if (jobManagerRunnerResult != null) {
@@ -545,7 +548,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     @Override
     public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
         return CompletableFuture.completedFuture(
-                Collections.unmodifiableSet(new HashSet<>(runningJobs.keySet())));
+                Collections.unmodifiableSet(jobManagerRunnerRegistry.getRunningJobIds()));
     }
 
     @Override
@@ -690,9 +693,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     @Override
     public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
-        JobManagerRunner job = runningJobs.get(jobId);
-
-        if (job == null) {
+        if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
             final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);
 
             if (executionGraphInfo == null) {
@@ -701,15 +702,17 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                 return CompletableFuture.completedFuture(
                         JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
             }
-        } else {
-            return job.getResultFuture()
-                    .thenApply(
-                            jobManagerRunnerResult ->
-                                    JobResult.createFrom(
-                                            jobManagerRunnerResult
-                                                    .getExecutionGraphInfo()
-                                                    .getArchivedExecutionGraph()));
         }
+
+        final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId);
+        return jobManagerRunner
+                .getResultFuture()
+                .thenApply(
+                        jobManagerRunnerResult ->
+                                JobResult.createFrom(
+                                        jobManagerRunnerResult
+                                                .getExecutionGraphInfo()
+                                                .getArchivedExecutionGraph()));
     }
 
     @Override
@@ -856,7 +859,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) {
-        final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId));
+        final JobManagerRunner job = checkNotNull(jobManagerRunnerRegistry.unregister(jobId));
         return CompletableFuture.supplyAsync(
                         () -> cleanUpJobGraph(jobId, cleanupJobState.cleanupHAData), ioExecutor)
                 .thenCompose(
@@ -958,7 +961,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     private void terminateRunningJobs() {
         log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
 
-        final HashSet<JobID> jobsToRemove = new HashSet<>(runningJobs.keySet());
+        final Set<JobID> jobsToRemove = jobManagerRunnerRegistry.getRunningJobIds();
 
         for (JobID jobId : jobsToRemove) {
             terminateJob(jobId);
@@ -966,9 +969,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     private void terminateJob(JobID jobId) {
-        final JobManagerRunner jobManagerRunner = runningJobs.get(jobId);
-
-        if (jobManagerRunner != null) {
+        if (jobManagerRunnerRegistry.isRegistered(jobId)) {
+            final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId);
             jobManagerRunner.closeAsync();
         }
     }
@@ -1083,11 +1085,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     /** Ensures that the JobMasterGateway is available. */
     private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) {
-        JobManagerRunner job = runningJobs.get(jobId);
-        if (job == null) {
+        if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
             return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
         }
 
+        final JobManagerRunner job = jobManagerRunnerRegistry.get(jobId);
         if (!job.isInitialized()) {
             return FutureUtils.completedExceptionally(
                     new UnavailableDispatcherOperationException(
@@ -1107,7 +1109,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     private Optional<JobManagerRunner> getJobManagerRunner(JobID jobId) {
-        return Optional.ofNullable(runningJobs.get(jobId));
+        return jobManagerRunnerRegistry.isRegistered(jobId)
+                ? Optional.of(jobManagerRunnerRegistry.get(jobId))
+                : Optional.empty();
     }
 
     private <T> CompletableFuture<T> runResourceManagerCommand(
@@ -1129,9 +1133,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
             Function<JobManagerRunner, CompletableFuture<T>> queryFunction) {
 
         List<CompletableFuture<Optional<T>>> optionalJobInformation =
-                new ArrayList<>(runningJobs.size());
+                new ArrayList<>(jobManagerRunnerRegistry.size());
 
-        for (JobManagerRunner job : runningJobs.values()) {
+        for (JobManagerRunner job : jobManagerRunnerRegistry.getJobManagerRunners()) {
             final CompletableFuture<Optional<T>> queryResult =
                     queryFunction
                             .apply(job)
@@ -1165,7 +1169,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-        if (runningJobs.containsKey(jobId)) {
+        if (jobManagerRunnerRegistry.isRegistered(jobId)) {
             return FutureUtils.completedExceptionally(
                     new DispatcherException(
                             String.format("Job with job id %s is still running.", jobId)));
@@ -1176,7 +1180,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
-        jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> (long) runningJobs.size());
+        jobManagerMetricGroup.gauge(
+                MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerRegistry.size());
     }
 
     public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java
new file mode 100644
index 0000000..dfd342c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
+import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+
+import java.util.Collection;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/** {@code JobManagerRunner} collects running jobs represented by {@link JobManagerRunner}. */
+public interface JobManagerRunnerRegistry
+        extends LocallyCleanableResource, GloballyCleanableResource {
+
+    /**
+     * Checks whether a {@link JobManagerRunner} is registered under the given {@link JobID}.
+     *
+     * @param jobId The {@code JobID} to check.
+     * @return {@code true}, if a {@code JobManagerRunner} is registered; {@code false} otherwise.
+     */
+    boolean isRegistered(JobID jobId);
+
+    /** Registers the given {@link JobManagerRunner} instance. */
+    void register(JobManagerRunner jobManagerRunner);
+
+    /**
+     * Returns the {@link JobManagerRunner} for the given {@code JobID}.
+     *
+     * @throws NoSuchElementException if the passed {@code JobID} does not belong to a registered
+     *     {@code JobManagerRunner}.
+     * @see #isRegistered(JobID)
+     */
+    JobManagerRunner get(JobID jobId);
+
+    /** Returns the number of {@link JobManagerRunner} instances currently being registered. */
+    int size();
+
+    /** Returns {@link JobID} instances of registered {@link JobManagerRunner} instances. */
+    Set<JobID> getRunningJobIds();
+
+    /** Returns the registered {@link JobManagerRunner} instances. */
+    Collection<JobManagerRunner> getJobManagerRunners();
+
+    /**
+     * Unregistered the {@link JobManagerRunner} with the given {@code JobID}. {@code null} is
+     * returned if there's no {@code JobManagerRunner} registered for the given {@link JobID}.
+     */
+    JobManagerRunner unregister(JobID jobId);
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
new file mode 100644
index 0000000..6eca873
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * {@code DefaultJobManagerRunnerRegistryTest} tests the functionality of {@link
+ * DefaultJobManagerRunnerRegistry}.
+ */
+public class DefaultJobManagerRunnerRegistryTest {
+
+    private JobManagerRunnerRegistry testInstance;
+
+    @BeforeEach
+    public void setup() {
+        testInstance =
+                new DefaultJobManagerRunnerRegistry(
+                        4, ComponentMainThreadExecutorServiceAdapter.forMainThread());
+    }
+
+    @Test
+    public void testIsRegistered() {
+        final JobID jobId = new JobID();
+        testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build());
+        assertThat(testInstance.isRegistered(jobId)).isTrue();
+    }
+
+    @Test
+    public void testIsNotRegistered() {
+        assertThat(testInstance.isRegistered(new JobID())).isFalse();
+    }
+
+    @Test
+    public void testRegister() {
+        final JobID jobId = new JobID();
+        testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build());
+        assertThat(testInstance.isRegistered(jobId)).isTrue();
+    }
+
+    @Test
+    public void testRegisteringTwiceCausesFailure() {
+        final JobID jobId = new JobID();
+        testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build());
+        assertThat(testInstance.isRegistered(jobId)).isTrue();
+
+        assertThatThrownBy(
+                        () ->
+                                testInstance.register(
+                                        TestingJobManagerRunner.newBuilder()
+                                                .setJobId(jobId)
+                                                .build()))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    public void testGet() {
+        final JobID jobId = new JobID();
+        final JobManagerRunner jobManagerRunner =
+                TestingJobManagerRunner.newBuilder().setJobId(jobId).build();
+        testInstance.register(jobManagerRunner);
+
+        assertThat(testInstance.get(jobId)).isEqualTo(jobManagerRunner);
+    }
+
+    @Test
+    public void testGetOnNonExistingJobManagerRunner() {
+        assertThatThrownBy(() -> testInstance.get(new JobID()))
+                .isInstanceOf(NoSuchElementException.class);
+    }
+
+    @Test
+    public void size() {
+        assertThat(testInstance.size()).isEqualTo(0);
+        testInstance.register(TestingJobManagerRunner.newBuilder().build());
+        assertThat(testInstance.size()).isEqualTo(1);
+        testInstance.register(TestingJobManagerRunner.newBuilder().build());
+        assertThat(testInstance.size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testGetRunningJobIds() {
+        assertThat(testInstance.getRunningJobIds()).isEmpty();
+
+        final JobID jobId0 = new JobID();
+        final JobID jobId1 = new JobID();
+        testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId0).build());
+        testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId1).build());
+
+        assertThat(testInstance.getRunningJobIds()).containsExactlyInAnyOrder(jobId0, jobId1);
+    }
+
+    @Test
+    public void testGetJobManagerRunners() {
+        assertThat(testInstance.getJobManagerRunners()).isEmpty();
+
+        final JobManagerRunner jobManagerRunner0 = TestingJobManagerRunner.newBuilder().build();
+        final JobManagerRunner jobManagerRunner1 = TestingJobManagerRunner.newBuilder().build();
+        testInstance.register(jobManagerRunner0);
+        testInstance.register(jobManagerRunner1);
+
+        assertThat(testInstance.getJobManagerRunners())
+                .containsExactlyInAnyOrder(jobManagerRunner0, jobManagerRunner1);
+    }
+
+    @Test
+    public void testSuccessfulLocalCleanup() throws Throwable {
+        final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
+
+        assertThat(
+                        testInstance.localCleanupAsync(
+                                jobManagerRunner.getJobID(), Executors.directExecutor()))
+                .isCompleted();
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(jobManagerRunner.getTerminationFuture()).isCompleted();
+    }
+
+    @Test
+    public void testFailingLocalCleanup() {
+        final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
+
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();
+        assertThat(jobManagerRunner.getTerminationFuture()).isNotDone();
+
+        final RuntimeException expectedException = new RuntimeException("Expected exception");
+        jobManagerRunner.completeTerminationFutureExceptionally(expectedException);
+
+        assertThat(
+                        testInstance.localCleanupAsync(
+                                jobManagerRunner.getJobID(), Executors.directExecutor()))
+                .failsWithin(Duration.ZERO)
+                .withThrowableOfType(ExecutionException.class)
+                .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
+                .hasExactlyElementsOfTypes(
+                        ExecutionException.class,
+                        FlinkException.class,
+                        expectedException.getClass())
+                .last()
+                .isEqualTo(expectedException);
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+    }
+
+    @Test
+    public void testSuccessfulLocalCleanupAsync() throws Exception {
+        final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
+
+        final CompletableFuture<Void> cleanupResult =
+                testInstance.localCleanupAsync(
+                        jobManagerRunner.getJobID(), Executors.directExecutor());
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(cleanupResult).isCompleted();
+    }
+
+    @Test
+    public void testFailingLocalCleanupAsync() throws Exception {
+        final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
+
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();
+        assertThat(jobManagerRunner.getTerminationFuture()).isNotDone();
+
+        final RuntimeException expectedException = new RuntimeException("Expected exception");
+        jobManagerRunner.completeTerminationFutureExceptionally(expectedException);
+
+        final CompletableFuture<Void> cleanupResult =
+                testInstance.localCleanupAsync(
+                        jobManagerRunner.getJobID(), Executors.directExecutor());
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(cleanupResult)
+                .isCompletedExceptionally()
+                .failsWithin(Duration.ZERO)
+                .withThrowableOfType(ExecutionException.class)
+                .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
+                .hasExactlyElementsOfTypes(
+                        ExecutionException.class,
+                        FlinkException.class,
+                        expectedException.getClass())
+                .last()
+                .isEqualTo(expectedException);
+    }
+
+    private TestingJobManagerRunner registerTestingJobManagerRunner() {
+        final TestingJobManagerRunner jobManagerRunner =
+                TestingJobManagerRunner.newBuilder().build();
+        testInstance.register(jobManagerRunner);
+
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();
+        assertThat(jobManagerRunner.getTerminationFuture()).isNotDone();
+
+        return jobManagerRunner;
+    }
+
+    @Test
+    public void testLocalCleanupAsyncOnUnknownJobId() {
+        assertThat(testInstance.localCleanupAsync(new JobID(), Executors.directExecutor()))
+                .isCompleted();
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
new file mode 100644
index 0000000..c9e60df
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * {@code TestingJobManagerRunnerRegistry} is a test implementation of {@link
+ * JobManagerRunnerRegistry}.
+ */
+public class TestingJobManagerRunnerRegistry implements JobManagerRunnerRegistry {
+
+    private final Function<JobID, Boolean> isRegisteredFunction;
+    private final Consumer<JobManagerRunner> registerConsumer;
+    private final Function<JobID, JobManagerRunner> getFunction;
+    private final Supplier<Integer> sizeSupplier;
+    private final Supplier<Set<JobID>> getRunningJobIdsSupplier;
+    private final Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier;
+    private final Function<JobID, JobManagerRunner> unregisterFunction;
+    private final BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction;
+    private final BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction;
+
+    private TestingJobManagerRunnerRegistry(
+            Function<JobID, Boolean> isRegisteredFunction,
+            Consumer<JobManagerRunner> registerConsumer,
+            Function<JobID, JobManagerRunner> getFunction,
+            Supplier<Integer> sizeSupplier,
+            Supplier<Set<JobID>> getRunningJobIdsSupplier,
+            Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier,
+            Function<JobID, JobManagerRunner> unregisterFunction,
+            BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction,
+            BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction) {
+        this.isRegisteredFunction = isRegisteredFunction;
+        this.registerConsumer = registerConsumer;
+        this.getFunction = getFunction;
+        this.sizeSupplier = sizeSupplier;
+        this.getRunningJobIdsSupplier = getRunningJobIdsSupplier;
+        this.getJobManagerRunnersSupplier = getJobManagerRunnersSupplier;
+        this.unregisterFunction = unregisterFunction;
+        this.localCleanupAsyncFunction = localCleanupAsyncFunction;
+        this.globalCleanupAsyncFunction = globalCleanupAsyncFunction;
+    }
+
+    @Override
+    public boolean isRegistered(JobID jobId) {
+        return isRegisteredFunction.apply(jobId);
+    }
+
+    @Override
+    public void register(JobManagerRunner jobManagerRunner) {
+        registerConsumer.accept(jobManagerRunner);
+    }
+
+    @Override
+    public JobManagerRunner get(JobID jobId) {
+        return getFunction.apply(jobId);
+    }
+
+    @Override
+    public int size() {
+        return sizeSupplier.get();
+    }
+
+    @Override
+    public Set<JobID> getRunningJobIds() {
+        return getRunningJobIdsSupplier.get();
+    }
+
+    @Override
+    public Collection<JobManagerRunner> getJobManagerRunners() {
+        return getJobManagerRunnersSupplier.get();
+    }
+
+    @Override
+    public JobManagerRunner unregister(JobID jobId) {
+        return unregisterFunction.apply(jobId);
+    }
+
+    @Override
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) {
+        return localCleanupAsyncFunction.apply(jobId, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
+        return globalCleanupAsyncFunction.apply(jobId, executor);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** {@code Builder} for creating {@code TestingJobManagerRunnerRegistry} instances. */
+    public static class Builder {
+
+        private Function<JobID, Boolean> isRegisteredFunction = ignoredJobId -> true;
+        private Consumer<JobManagerRunner> registerConsumer = ignoredRunner -> {};
+        private Function<JobID, JobManagerRunner> getFunction = ignoredJobId -> null;
+        private Supplier<Integer> sizeSupplier = () -> 0;
+        private Supplier<Set<JobID>> getRunningJobIdsSupplier = Collections::emptySet;
+        private Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier =
+                Collections::emptyList;
+        private Function<JobID, JobManagerRunner> unregisterFunction = ignoredJobId -> null;
+        private BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction =
+                (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture();
+        private BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction =
+                (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture();
+
+        public Builder withIsRegisteredFunction(Function<JobID, Boolean> isRegisteredFunction) {
+            this.isRegisteredFunction = isRegisteredFunction;
+            return this;
+        }
+
+        public Builder withRegisterConsumer(Consumer<JobManagerRunner> registerConsumer) {
+            this.registerConsumer = registerConsumer;
+            return this;
+        }
+
+        public Builder withGetFunction(Function<JobID, JobManagerRunner> getFunction) {
+            this.getFunction = getFunction;
+            return this;
+        }
+
+        public Builder withSizeSupplier(Supplier<Integer> sizeSupplier) {
+            this.sizeSupplier = sizeSupplier;
+            return this;
+        }
+
+        public Builder withGetRunningJobIdsSupplier(Supplier<Set<JobID>> getRunningJobIdsSupplier) {
+            this.getRunningJobIdsSupplier = getRunningJobIdsSupplier;
+            return this;
+        }
+
+        public Builder withGetJobManagerRunnersSupplier(
+                Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier) {
+            this.getJobManagerRunnersSupplier = getJobManagerRunnersSupplier;
+            return this;
+        }
+
+        public Builder withUnregisterFunction(
+                Function<JobID, JobManagerRunner> unregisterFunction) {
+            this.unregisterFunction = unregisterFunction;
+            return this;
+        }
+
+        public Builder withLocalCleanupAsyncFunction(
+                BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction) {
+            this.localCleanupAsyncFunction = localCleanupAsyncFunction;
+            return this;
+        }
+
+        public Builder withGlobalCleanupAsyncFunction(
+                BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction) {
+            this.globalCleanupAsyncFunction = globalCleanupAsyncFunction;
+            return this;
+        }
+
+        public TestingJobManagerRunnerRegistry build() {
+            return new TestingJobManagerRunnerRegistry(
+                    isRegisteredFunction,
+                    registerConsumer,
+                    getFunction,
+                    sizeSupplier,
+                    getRunningJobIdsSupplier,
+                    getJobManagerRunnersSupplier,
+                    unregisterFunction,
+                    localCleanupAsyncFunction,
+                    globalCleanupAsyncFunction);
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 0c0994d..7b6adf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -155,6 +155,10 @@ public class TestingJobManagerRunner implements JobManagerRunner {
         terminationFuture.complete(null);
     }
 
+    public void completeTerminationFutureExceptionally(Throwable expectedException) {
+        terminationFuture.completeExceptionally(expectedException);
+    }
+
     public CompletableFuture<Void> getTerminationFuture() {
         return terminationFuture;
     }
@@ -166,7 +170,7 @@ public class TestingJobManagerRunner implements JobManagerRunner {
     /** {@code Builder} for instantiating {@link TestingJobManagerRunner} instances. */
     public static class Builder {
 
-        private JobID jobId = null;
+        private JobID jobId = new JobID();
         private boolean blockingTermination = false;
         private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
                 new CompletableFuture<>();