You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/06 16:55:42 UTC

[flink] 03/03: [FLINK-25953][runtime] Reorganizes dispatcher cleanup related tests

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 919dca5fd093f8a1bf28383dbaf75677bab595b8
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Thu Feb 3 18:55:55 2022 +0100

    [FLINK-25953][runtime] Reorganizes dispatcher cleanup related tests
    
    This includes introducing a TestingDispatcher.Builder and aligning
    the usages of the TestingDispatcher instantiation between tests.
    Additionally, tests were renamed, obsolete tests were removed and
    cleanup-related tests moved into DispatcherResourceCleanupTest.
---
 .../DefaultJobManagerRunnerRegistry.java           |  12 +-
 .../flink/runtime/dispatcher/Dispatcher.java       |  69 ++-
 .../OnMainThreadJobManagerRunnerRegistry.java      | 112 ++++
 .../cleanup/DispatcherResourceCleanerFactory.java  |   2 +-
 .../runtime/dispatcher/AbstractDispatcherTest.java | 137 +----
 .../DefaultJobManagerRunnerRegistryTest.java       |   5 +-
 .../dispatcher/DispatcherFailoverITCase.java       |  10 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  | 575 ++++++++++-----------
 .../flink/runtime/dispatcher/DispatcherTest.java   | 180 +------
 .../runtime/dispatcher/TestingDispatcher.java      | 281 ++++++++++
 .../cleanup/TestingResourceCleanerFactory.java     | 132 +++++
 11 files changed, 876 insertions(+), 639 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
index 2ba54e9..2eb0b30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -37,19 +36,15 @@ import java.util.concurrent.Executor;
 
 /**
  * {@code DefaultJobManagerRunnerRegistry} is the default implementation of the {@link
- * JobManagerRunnerRegistry} interface. All methods of this class are expected to be called from
- * within the main thread.
+ * JobManagerRunnerRegistry} interface.
  */
 public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry {
 
     @VisibleForTesting final Map<JobID, JobManagerRunner> jobManagerRunners;
-    private final ComponentMainThreadExecutor mainThreadExecutor;
 
-    public DefaultJobManagerRunnerRegistry(
-            int initialCapacity, ComponentMainThreadExecutor mainThreadExecutor) {
+    public DefaultJobManagerRunnerRegistry(int initialCapacity) {
         Preconditions.checkArgument(initialCapacity > 0);
         jobManagerRunners = new HashMap<>(initialCapacity);
-        this.mainThreadExecutor = mainThreadExecutor;
     }
 
     @Override
@@ -59,7 +54,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry
 
     @Override
     public void register(JobManagerRunner jobManagerRunner) {
-        mainThreadExecutor.assertRunningInMainThread();
         Preconditions.checkArgument(
                 !isRegistered(jobManagerRunner.getJobID()),
                 "A job with the ID %s is already registered.",
@@ -99,7 +93,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry
     }
 
     private CompletableFuture<Void> cleanup(JobID jobId) {
-        mainThreadExecutor.assertRunningInMainThread();
         if (isRegistered(jobId)) {
             try {
                 unregister(jobId).close();
@@ -113,7 +106,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry
 
     @Override
     public JobManagerRunner unregister(JobID jobId) {
-        mainThreadExecutor.assertRunningInMainThread();
         assertJobRegistered(jobId);
         return this.jobManagerRunners.remove(jobId);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index f15f294..5db8412 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -97,7 +98,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.Executor;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -113,6 +113,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     public static final String DISPATCHER_NAME = "dispatcher";
 
+    private static final int INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY = 16;
+
     private final Configuration configuration;
 
     private final JobGraphWriter jobGraphWriter;
@@ -126,7 +128,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     private final FatalErrorHandler fatalErrorHandler;
 
-    private final JobManagerRunnerRegistry jobManagerRunnerRegistry;
+    private final OnMainThreadJobManagerRunnerRegistry jobManagerRunnerRegistry;
 
     private final Collection<JobGraph> recoveredJobs;
 
@@ -140,8 +142,6 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     private final HistoryServerArchivist historyServerArchivist;
 
-    private final Executor ioExecutor;
-
     @Nullable private final String metricServiceQueryAddress;
 
     private final Map<JobID, CompletableFuture<Void>> jobManagerRunnerTerminationFutures;
@@ -169,8 +169,48 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
             DispatcherBootstrapFactory dispatcherBootstrapFactory,
             DispatcherServices dispatcherServices)
             throws Exception {
+        this(
+                rpcService,
+                fencingToken,
+                recoveredJobs,
+                recoveredDirtyJobs,
+                dispatcherBootstrapFactory,
+                dispatcherServices,
+                new DefaultJobManagerRunnerRegistry(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY));
+    }
+
+    private Dispatcher(
+            RpcService rpcService,
+            DispatcherId fencingToken,
+            Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> globallyTerminatedJobs,
+            DispatcherBootstrapFactory dispatcherBootstrapFactory,
+            DispatcherServices dispatcherServices,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry)
+            throws Exception {
+        this(
+                rpcService,
+                fencingToken,
+                recoveredJobs,
+                globallyTerminatedJobs,
+                dispatcherBootstrapFactory,
+                dispatcherServices,
+                jobManagerRunnerRegistry,
+                new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices));
+    }
+
+    @VisibleForTesting
+    protected Dispatcher(
+            RpcService rpcService,
+            DispatcherId fencingToken,
+            Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> recoveredDirtyJobs,
+            DispatcherBootstrapFactory dispatcherBootstrapFactory,
+            DispatcherServices dispatcherServices,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry,
+            ResourceCleanerFactory resourceCleanerFactory)
+            throws Exception {
         super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
-        checkNotNull(dispatcherServices);
         assertRecoveredJobsAndDirtyJobResults(recoveredJobs, recoveredDirtyJobs);
 
         this.configuration = dispatcherServices.getConfiguration();
@@ -184,14 +224,14 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
         this.jobResultStore = dispatcherServices.getJobResultStore();
         this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup();
         this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress();
-        this.ioExecutor = dispatcherServices.getIoExecutor();
 
         this.jobManagerSharedServices =
                 JobManagerSharedServices.fromConfiguration(
                         configuration, blobServer, fatalErrorHandler);
 
-        jobManagerRunnerRegistry =
-                new DefaultJobManagerRunnerRegistry(16, this.getMainThreadExecutor());
+        this.jobManagerRunnerRegistry =
+                new OnMainThreadJobManagerRunnerRegistry(
+                        jobManagerRunnerRegistry, this.getMainThreadExecutor());
 
         this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
 
@@ -199,7 +239,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
         this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
 
-        this.jobManagerRunnerTerminationFutures = new HashMap<>(2);
+        this.jobManagerRunnerTerminationFutures =
+                new HashMap<>(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY);
 
         this.shutDownFuture = new CompletableFuture<>();
 
@@ -209,7 +250,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
         this.blobServer.retainJobs(
                 recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()),
-                ioExecutor);
+                dispatcherServices.getIoExecutor());
 
         this.dispatcherCachedOperationsHandler =
                 new DispatcherCachedOperationsHandler(
@@ -217,8 +258,6 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                         this::triggerSavepointAndGetLocation,
                         this::stopWithSavepointAndGetLocation);
 
-        final ResourceCleanerFactory resourceCleanerFactory =
-                new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices);
         this.localResourceCleaner =
                 resourceCleanerFactory.createLocalResourceCleaner(this.getMainThreadExecutor());
         this.globalResourceCleaner =
@@ -1119,7 +1158,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
         jobManagerMetricGroup.gauge(
-                MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerRegistry.size());
+                MetricNames.NUM_RUNNING_JOBS,
+                // metrics can be called from anywhere and therefore, have to run without the main
+                // thread safeguard being triggered. For metrics, we can afford to be not 100%
+                // accurate
+                () -> (long) jobManagerRunnerRegistry.getWrappedDelegate().size());
     }
 
     public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java
new file mode 100644
index 0000000..4c16b42
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.WrappingProxy;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code OnMainThreadJobManagerRunnerRegistry} implements {@link JobManagerRunnerRegistry} guarding
+ * the passed {@code JobManagerRunnerRegistry} instance in a way that it only allows modifying
+ * methods to be executed on the component's main thread.
+ *
+ * @see ComponentMainThreadExecutor
+ */
+public class OnMainThreadJobManagerRunnerRegistry
+        implements JobManagerRunnerRegistry, WrappingProxy<JobManagerRunnerRegistry> {
+
+    private final JobManagerRunnerRegistry delegate;
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public OnMainThreadJobManagerRunnerRegistry(
+            JobManagerRunnerRegistry delegate, ComponentMainThreadExecutor mainThreadExecutor) {
+        this.delegate = delegate;
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    @Override
+    public boolean isRegistered(JobID jobId) {
+        mainThreadExecutor.assertRunningInMainThread();
+        return delegate.isRegistered(jobId);
+    }
+
+    @Override
+    public void register(JobManagerRunner jobManagerRunner) {
+        mainThreadExecutor.assertRunningInMainThread();
+        delegate.register(jobManagerRunner);
+    }
+
+    @Override
+    public JobManagerRunner get(JobID jobId) {
+        mainThreadExecutor.assertRunningInMainThread();
+        return delegate.get(jobId);
+    }
+
+    @Override
+    public int size() {
+        mainThreadExecutor.assertRunningInMainThread();
+        return delegate.size();
+    }
+
+    @Override
+    public Set<JobID> getRunningJobIds() {
+        mainThreadExecutor.assertRunningInMainThread();
+        return delegate.getRunningJobIds();
+    }
+
+    @Override
+    public Collection<JobManagerRunner> getJobManagerRunners() {
+        mainThreadExecutor.assertRunningInMainThread();
+        return delegate.getJobManagerRunners();
+    }
+
+    @Override
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
+        mainThreadExecutor.assertRunningInMainThread();
+        return delegate.globalCleanupAsync(jobId, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) {
+        mainThreadExecutor.assertRunningInMainThread();
+        return delegate.localCleanupAsync(jobId, executor);
+    }
+
+    @Override
+    public JobManagerRunner unregister(JobID jobId) {
+        mainThreadExecutor.assertRunningInMainThread();
+        return delegate.unregister(jobId);
+    }
+
+    /**
+     * Returns the delegated {@link JobManagerRunnerRegistry}. This method can be used to workaround
+     * the main thread safeguard.
+     */
+    @Override
+    public JobManagerRunnerRegistry getWrappedDelegate() {
+        return this.delegate;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
index 3faa358..0957a42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
@@ -62,7 +62,7 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory
     }
 
     @VisibleForTesting
-    DispatcherResourceCleanerFactory(
+    public DispatcherResourceCleanerFactory(
             Executor cleanupExecutor,
             JobManagerRunnerRegistry jobManagerRunnerRegistry,
             JobGraphWriter jobGraphWriter,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
index f277e2b..75d638b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
@@ -26,18 +26,10 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.JobResultStore;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.JobGraphWriter;
 import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -53,11 +45,6 @@ import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ForkJoinPool;
-
 /** Abstract test for the {@link Dispatcher} component. */
 public class AbstractDispatcherTest extends TestLogger {
 
@@ -116,6 +103,19 @@ public class AbstractDispatcherTest extends TestLogger {
                 new BlobServer(configuration, temporaryFolder.newFolder(), new VoidBlobStore());
     }
 
+    protected TestingDispatcher.Builder createTestingDispatcherBuilder() {
+        return TestingDispatcher.builder()
+                .setRpcService(rpcService)
+                .setConfiguration(configuration)
+                .setHeartbeatServices(heartbeatServices)
+                .setHighAvailabilityServices(haServices)
+                .setJobGraphWriter(haServices.getJobGraphStore())
+                .setJobResultStore(haServices.getJobResultStore())
+                .setJobManagerRunnerFactory(JobMasterServiceLeadershipRunnerFactory.INSTANCE)
+                .setFatalErrorHandler(testingFatalErrorHandlerResource.getFatalErrorHandler())
+                .setBlobServer(blobServer);
+    }
+
     @After
     public void tearDown() throws Exception {
         if (haServices != null) {
@@ -129,115 +129,4 @@ public class AbstractDispatcherTest extends TestLogger {
     protected BlobServer getBlobServer() {
         return blobServer;
     }
-
-    /** A convenient builder for the {@link TestingDispatcher}. */
-    public class TestingDispatcherBuilder {
-
-        private Collection<JobGraph> initialJobGraphs = Collections.emptyList();
-
-        private Collection<JobResult> dirtyJobResults = Collections.emptyList();
-
-        private DispatcherBootstrapFactory dispatcherBootstrapFactory =
-                (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap();
-
-        private HeartbeatServices heartbeatServices = AbstractDispatcherTest.this.heartbeatServices;
-
-        private HighAvailabilityServices haServices = AbstractDispatcherTest.this.haServices;
-
-        private JobManagerRunnerFactory jobManagerRunnerFactory =
-                JobMasterServiceLeadershipRunnerFactory.INSTANCE;
-
-        private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
-
-        private JobResultStore jobResultStore = new EmbeddedJobResultStore();
-
-        private FatalErrorHandler fatalErrorHandler =
-                testingFatalErrorHandlerResource.getFatalErrorHandler();
-
-        private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;
-
-        TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) {
-            this.heartbeatServices = heartbeatServices;
-            return this;
-        }
-
-        TestingDispatcherBuilder setHaServices(HighAvailabilityServices haServices) {
-            this.haServices = haServices;
-            return this;
-        }
-
-        TestingDispatcherBuilder setInitialJobGraphs(Collection<JobGraph> initialJobGraphs) {
-            this.initialJobGraphs = initialJobGraphs;
-            return this;
-        }
-
-        TestingDispatcherBuilder setDirtyJobResults(Collection<JobResult> dirtyJobResults) {
-            this.dirtyJobResults = dirtyJobResults;
-            return this;
-        }
-
-        TestingDispatcherBuilder setDispatcherBootstrapFactory(
-                DispatcherBootstrapFactory dispatcherBootstrapFactory) {
-            this.dispatcherBootstrapFactory = dispatcherBootstrapFactory;
-            return this;
-        }
-
-        TestingDispatcherBuilder setJobManagerRunnerFactory(
-                JobManagerRunnerFactory jobManagerRunnerFactory) {
-            this.jobManagerRunnerFactory = jobManagerRunnerFactory;
-            return this;
-        }
-
-        TestingDispatcherBuilder setJobGraphWriter(JobGraphWriter jobGraphWriter) {
-            this.jobGraphWriter = jobGraphWriter;
-            return this;
-        }
-
-        TestingDispatcherBuilder setJobResultStore(JobResultStore jobResultStore) {
-            this.jobResultStore = jobResultStore;
-            return this;
-        }
-
-        public TestingDispatcherBuilder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) {
-            this.fatalErrorHandler = fatalErrorHandler;
-            return this;
-        }
-
-        public TestingDispatcherBuilder setHistoryServerArchivist(
-                HistoryServerArchivist historyServerArchivist) {
-            this.historyServerArchivist = historyServerArchivist;
-            return this;
-        }
-
-        TestingDispatcher build() throws Exception {
-            TestingResourceManagerGateway resourceManagerGateway =
-                    new TestingResourceManagerGateway();
-
-            final MemoryExecutionGraphInfoStore executionGraphInfoStore =
-                    new MemoryExecutionGraphInfoStore();
-
-            return new TestingDispatcher(
-                    rpcService,
-                    DispatcherId.generate(),
-                    initialJobGraphs,
-                    dirtyJobResults,
-                    dispatcherBootstrapFactory,
-                    new DispatcherServices(
-                            configuration,
-                            haServices,
-                            () -> CompletableFuture.completedFuture(resourceManagerGateway),
-                            blobServer,
-                            heartbeatServices,
-                            executionGraphInfoStore,
-                            fatalErrorHandler,
-                            historyServerArchivist,
-                            null,
-                            new DispatcherOperationCaches(),
-                            UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
-                            jobGraphWriter,
-                            jobResultStore,
-                            jobManagerRunnerFactory,
-                            ForkJoinPool.commonPool()));
-        }
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
index 6eca873..1d4cec9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.FlinkAssertions;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
 import org.apache.flink.util.FlinkException;
@@ -47,9 +46,7 @@ public class DefaultJobManagerRunnerRegistryTest {
 
     @BeforeEach
     public void setup() {
-        testInstance =
-                new DefaultJobManagerRunnerRegistry(
-                        4, ComponentMainThreadExecutorServiceAdapter.forMainThread());
+        testInstance = new DefaultJobManagerRunnerRegistry(4);
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
index fb9ac8a..6f3bcce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
@@ -241,13 +241,9 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest {
             }
         }
         final TestingDispatcher dispatcher =
-                new TestingDispatcherBuilder()
-                        .setJobManagerRunnerFactory(
-                                JobMasterServiceLeadershipRunnerFactory.INSTANCE)
-                        .setJobGraphWriter(haServices.getJobGraphStore())
-                        .setJobResultStore(haServices.getJobResultStore())
-                        .setInitialJobGraphs(jobGraphs)
-                        .setDirtyJobResults(haServices.getJobResultStore().getDirtyResults())
+                createTestingDispatcherBuilder()
+                        .setRecoveredJobs(jobGraphs)
+                        .setRecoveredDirtyJobs(haServices.getJobResultStore().getDirtyResults())
                         .setFatalErrorHandler(
                                 fatalErrorHandler == null
                                         ? testingFatalErrorHandlerResource.getFatalErrorHandler()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index a4063b6..127f98c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -22,34 +22,27 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.blob.TestingBlobStore;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
 import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.JobResultEntry;
 import org.apache.flink.runtime.highavailability.JobResultStore;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
-import org.apache.flink.runtime.jobmanager.JobGraphWriter;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -60,8 +53,10 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.Reference;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.hamcrest.core.IsInstanceOf;
 import org.junit.After;
@@ -74,28 +69,23 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Queue;
-import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.BiFunction;
 
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
+import static org.apache.flink.runtime.dispatcher.AbstractDispatcherTest.awaitStatus;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /** Tests the resource cleanup by the {@link Dispatcher}. */
@@ -117,30 +107,14 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
     private JobGraph jobGraph;
 
-    private Configuration configuration;
-
-    private JobResultStore jobResultStore;
-
-    private TestingHighAvailabilityServices highAvailabilityServices;
-
-    private OneShotLatch clearedJobLatch;
-
     private TestingDispatcher dispatcher;
 
     private DispatcherGateway dispatcherGateway;
 
     private BlobServer blobServer;
 
-    private PermanentBlobKey permanentBlobKey;
-
-    private File blobFile;
-
-    private CompletableFuture<BlobKey> storedHABlobFuture;
-    private CompletableFuture<JobID> deleteAllHABlobsFuture;
     private CompletableFuture<JobID> localCleanupFuture;
     private CompletableFuture<JobID> globalCleanupFuture;
-    private CompletableFuture<JobID> cleanupJobHADataFuture;
-    private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
 
     @BeforeClass
     public static void setupClass() {
@@ -152,41 +126,14 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
         jobId = jobGraph.getJobID();
 
-        configuration = new Configuration();
-
-        highAvailabilityServices = new TestingHighAvailabilityServices();
-        clearedJobLatch = new OneShotLatch();
-        jobResultStore = new SingleJobResultStore(jobId, clearedJobLatch);
-        highAvailabilityServices.setJobResultStore(jobResultStore);
-        cleanupJobHADataFuture = new CompletableFuture<>();
-        highAvailabilityServices.setGlobalCleanupFuture(cleanupJobHADataFuture);
-
-        storedHABlobFuture = new CompletableFuture<>();
-        deleteAllHABlobsFuture = new CompletableFuture<>();
-
-        final TestingBlobStore testingBlobStore =
-                new TestingBlobStoreBuilder()
-                        .setPutFunction(
-                                (file, jobId, blobKey) -> storedHABlobFuture.complete(blobKey))
-                        .setDeleteAllFunction(deleteAllHABlobsFuture::complete)
-                        .createTestingBlobStore();
-
         globalCleanupFuture = new CompletableFuture<>();
         localCleanupFuture = new CompletableFuture<>();
 
         blobServer =
-                new TestingBlobServer(
-                        configuration,
-                        temporaryFolder.newFolder(),
-                        testingBlobStore,
-                        (jobId, ignoredExecutor) -> {
-                            globalCleanupFuture.complete(jobId);
-                            return FutureUtils.completedVoidFuture();
-                        },
-                        (jobId, ignoredExecutor) -> {
-                            localCleanupFuture.complete(jobId);
-                            return FutureUtils.completedVoidFuture();
-                        });
+                BlobUtils.createBlobServer(
+                        new Configuration(),
+                        Reference.owned(temporaryFolder.newFolder()),
+                        new TestingBlobStoreBuilder().createTestingBlobStore());
     }
 
     private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() throws Exception {
@@ -195,54 +142,72 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
     private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(
             int numBlockingJobManagerRunners) throws Exception {
+        return startDispatcherAndSubmitJob(
+                createTestingDispatcherBuilder(), numBlockingJobManagerRunners);
+    }
+
+    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(
+            TestingDispatcher.Builder dispatcherBuilder, int numBlockingJobManagerRunners)
+            throws Exception {
         final TestingJobManagerRunnerFactory testingJobManagerRunnerFactoryNG =
                 new TestingJobManagerRunnerFactory(numBlockingJobManagerRunners);
-        startDispatcher(testingJobManagerRunnerFactoryNG);
+        startDispatcher(dispatcherBuilder, testingJobManagerRunnerFactoryNG);
         submitJobAndWait();
 
         return testingJobManagerRunnerFactoryNG;
     }
 
     private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
-        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-        final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
-        final MemoryExecutionGraphInfoStore archivedExecutionGraphStore =
-                new MemoryExecutionGraphInfoStore();
-        dispatcher =
-                new TestingDispatcher(
-                        rpcService,
-                        DispatcherId.generate(),
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        (dispatcher, scheduledExecutor, errorHandler) ->
-                                new NoOpDispatcherBootstrap(),
-                        new DispatcherServices(
-                                configuration,
-                                highAvailabilityServices,
-                                () -> CompletableFuture.completedFuture(resourceManagerGateway),
-                                blobServer,
-                                heartbeatServices,
-                                archivedExecutionGraphStore,
-                                testingFatalErrorHandlerResource.getFatalErrorHandler(),
-                                VoidHistoryServerArchivist.INSTANCE,
-                                null,
-                                new DispatcherOperationCaches(),
-                                UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
-                                jobGraphWriter,
-                                jobResultStore,
-                                jobManagerRunnerFactory,
-                                ForkJoinPool.commonPool()));
+        startDispatcher(createTestingDispatcherBuilder(), jobManagerRunnerFactory);
+    }
+
+    private void startDispatcher(
+            TestingDispatcher.Builder dispatcherBuilder,
+            JobManagerRunnerFactory jobManagerRunnerFactory)
+            throws Exception {
+        dispatcher = dispatcherBuilder.setJobManagerRunnerFactory(jobManagerRunnerFactory).build();
 
         dispatcher.start();
 
         dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
     }
 
+    private TestingDispatcher.Builder createTestingDispatcherBuilder() {
+        final JobManagerRunnerRegistry jobManagerRunnerRegistry =
+                new DefaultJobManagerRunnerRegistry(2);
+        return TestingDispatcher.builder()
+                .setRpcService(rpcService)
+                .setBlobServer(blobServer)
+                .setJobManagerRunnerRegistry(jobManagerRunnerRegistry)
+                .setFatalErrorHandler(testingFatalErrorHandlerResource.getFatalErrorHandler())
+                .setResourceCleanerFactory(
+                        TestingResourceCleanerFactory.builder()
+                                // JobManagerRunnerRegistry needs to be added explicitly
+                                // because cleaning it will trigger the closeAsync latch
+                                // provided by TestingJobManagerRunner
+                                .withLocallyCleanableResource(jobManagerRunnerRegistry)
+                                .withGloballyCleanableResource(
+                                        (jobId, ignoredExecutor) -> {
+                                            globalCleanupFuture.complete(jobId);
+                                            return FutureUtils.completedVoidFuture();
+                                        })
+                                .withLocallyCleanableResource(
+                                        (jobId, ignoredExecutor) -> {
+                                            localCleanupFuture.complete(jobId);
+                                            return FutureUtils.completedVoidFuture();
+                                        })
+                                .build());
+    }
+
     @After
     public void teardown() throws Exception {
         if (dispatcher != null) {
             dispatcher.close();
         }
+
+        if (blobServer != null) {
+            blobServer.close();
+        }
     }
 
     @AfterClass
@@ -253,41 +218,29 @@ public class DispatcherResourceCleanupTest extends TestLogger {
     }
 
     @Test
-    public void testBlobServerCleanupWhenJobFinished() throws Exception {
+    public void testGlobalCleanupWhenJobFinished() throws Exception {
         final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
                 startDispatcherAndSubmitJob();
 
         // complete the job
         finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
 
-        assertThatHABlobsHaveBeenRemoved();
+        assertGlobalCleanupTriggered(jobId);
     }
 
-    private void assertThatHABlobsHaveBeenRemoved()
-            throws InterruptedException, ExecutionException, TimeoutException {
-        assertGlobalCleanupTriggered(jobId);
+    @Test
+    public void testGlobalCleanupWhenJobCanceled() throws Exception {
+        final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
+                startDispatcherAndSubmitJob();
 
-        // verify that we also cleared the BlobStore
-        assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
+        // complete the job
+        cancelJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
 
-        assertThat(blobFile.exists(), is(false));
+        assertGlobalCleanupTriggered(jobId);
     }
 
     private CompletableFuture<Acknowledge> submitJob() {
-        try {
-            // upload a blob to the blob server
-            permanentBlobKey = blobServer.putPermanent(jobId, new byte[256]);
-            jobGraph.addUserJarBlobKey(permanentBlobKey);
-            blobFile = blobServer.getStorageLocation(jobId, permanentBlobKey);
-
-            assertThat(blobFile.exists(), is(true));
-
-            // verify that we stored the blob also in the BlobStore
-            assertThat(storedHABlobFuture.join(), equalTo(permanentBlobKey));
-            return dispatcherGateway.submitJob(jobGraph, timeout);
-        } catch (IOException ioe) {
-            return FutureUtils.completedExceptionally(ioe);
-        }
+        return dispatcherGateway.submitJob(jobGraph, timeout);
     }
 
     private void submitJobAndWait() {
@@ -295,7 +248,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
     }
 
     @Test
-    public void testBlobServerCleanupWhenJobNotFinished() throws Exception {
+    public void testLocalCleanupWhenJobNotFinished() throws Exception {
         final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
                 startDispatcherAndSubmitJob();
 
@@ -305,22 +258,10 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         suspendJob(testingJobManagerRunner);
 
         assertLocalCleanupTriggered(jobId);
-        assertThat(blobFile.exists(), is(false));
-
-        // verify that we did not clear the BlobStore
-        try {
-            deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
-            fail("We should not delete the HA blobs.");
-        } catch (TimeoutException ignored) {
-            // expected
-        }
-
-        assertThat(deleteAllHABlobsFuture.isDone(), is(false));
     }
 
-    /** Tests that the uploaded blobs are being cleaned up in case of a job submission failure. */
     @Test
-    public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception {
+    public void testGlobalCleanupWhenJobSubmissionFails() throws Exception {
         startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
         final CompletableFuture<Acknowledge> submissionFuture = submitJob();
 
@@ -328,34 +269,23 @@ public class DispatcherResourceCleanupTest extends TestLogger {
             submissionFuture.get();
             fail("Job submission was expected to fail.");
         } catch (ExecutionException ee) {
-            assertThat(ee, FlinkMatchers.containsCause(JobSubmissionException.class));
+            assertThat(ee, containsCause(JobSubmissionException.class));
         }
 
-        assertThatHABlobsHaveBeenRemoved();
+        assertGlobalCleanupTriggered(jobId);
     }
 
     @Test
-    public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
+    public void testLocalCleanupWhenClosingDispatcher() throws Exception {
         startDispatcherAndSubmitJob();
 
         dispatcher.closeAsync().get();
 
         assertLocalCleanupTriggered(jobId);
-        assertThat(blobFile.exists(), is(false));
-
-        // verify that we did not clear the BlobStore
-        try {
-            deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
-            fail("We should not delete the HA blobs.");
-        } catch (TimeoutException ignored) {
-            // expected
-        }
-
-        assertThat(deleteAllHABlobsFuture.isDone(), is(false));
     }
 
     @Test
-    public void testHACleanupWhenJobFinishedWhileClosingDispatcher() throws Exception {
+    public void testGlobalCleanupWhenJobFinishedWhileClosingDispatcher() throws Exception {
         final TestingJobManagerRunner testingJobManagerRunner =
                 TestingJobManagerRunner.newBuilder()
                         .setBlockingTermination(true)
@@ -384,38 +314,89 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         dispatcherTerminationFuture.get();
 
         assertGlobalCleanupTriggered(jobId);
-        assertThat(deleteAllHABlobsFuture.get(), is(jobId));
     }
 
-    /**
-     * Tests that the {@link JobResultStore} entries are marked as clean after the job reached a
-     * terminal state.
-     */
     @Test
-    public void testJobResultStoreCleanup() throws Exception {
+    public void testJobBeingMarkedAsDirtyBeforeCleanup() throws Exception {
+        final OneShotLatch markAsDirtyLatch = new OneShotLatch();
+
+        final TestingDispatcher.Builder dispatcherBuilder =
+                createTestingDispatcherBuilder()
+                        .setJobResultStore(
+                                TestingJobResultStore.builder()
+                                        .withCreateDirtyResultConsumer(
+                                                ignoredJobResultEntry -> {
+                                                    try {
+                                                        markAsDirtyLatch.await();
+                                                    } catch (InterruptedException e) {
+                                                        throw new RuntimeException(e);
+                                                    }
+                                                })
+                                        .build());
+
         final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
-                startDispatcherAndSubmitJob();
+                startDispatcherAndSubmitJob(dispatcherBuilder, 0);
 
-        final JobResult jobResult =
-                TestingJobResultStore.createJobResult(jobId, ApplicationStatus.UNKNOWN);
+        finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
 
-        jobResultStore.createDirtyResult(new JobResultEntry(jobResult));
-        assertTrue(jobResultStore.hasJobResultEntry(jobId));
+        assertThatNoCleanupWasTriggered();
 
-        final TestingJobManagerRunner testingJobManagerRunner =
-                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        testingJobManagerRunner.completeResultFuture(
-                new ExecutionGraphInfo(
-                        new ArchivedExecutionGraphBuilder()
-                                .setState(JobStatus.FINISHED)
-                                .setJobID(jobId)
-                                .build()));
+        markAsDirtyLatch.trigger();
 
-        // wait for the clearing
-        clearedJobLatch.await();
+        assertGlobalCleanupTriggered(jobId);
+    }
 
-        assertTrue(jobResultStore.hasJobResultEntry(jobId));
-        assertTrue(jobResultStore.getDirtyResults().isEmpty());
+    @Test
+    public void testJobBeingMarkedAsCleanAfterCleanup() throws Exception {
+        final CompletableFuture<JobID> markAsCleanFuture = new CompletableFuture<>();
+
+        final JobResultStore jobResultStore =
+                TestingJobResultStore.builder()
+                        .withMarkResultAsCleanConsumer(markAsCleanFuture::complete)
+                        .build();
+        final OneShotLatch localCleanupLatch = new OneShotLatch();
+        final OneShotLatch globalCleanupLatch = new OneShotLatch();
+        final TestingResourceCleanerFactory resourceCleanerFactory =
+                TestingResourceCleanerFactory.builder()
+                        .withLocallyCleanableResource(
+                                (ignoredJobId, ignoredExecutor) -> {
+                                    try {
+                                        localCleanupLatch.await();
+                                    } catch (InterruptedException e) {
+                                        throw new RuntimeException(e);
+                                    }
+
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .withGloballyCleanableResource(
+                                (ignoredJobId, ignoredExecutor) -> {
+                                    try {
+                                        globalCleanupLatch.await();
+                                    } catch (InterruptedException e) {
+                                        throw new RuntimeException(e);
+                                    }
+
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        final TestingDispatcher.Builder dispatcherBuilder =
+                createTestingDispatcherBuilder()
+                        .setJobResultStore(jobResultStore)
+                        .setResourceCleanerFactory(resourceCleanerFactory);
+
+        final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
+                startDispatcherAndSubmitJob(dispatcherBuilder, 0);
+
+        finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
+
+        assertThat(markAsCleanFuture.isDone(), is(false));
+
+        localCleanupLatch.trigger();
+        assertThat(markAsCleanFuture.isDone(), is(false));
+        globalCleanupLatch.trigger();
+
+        assertThat(markAsCleanFuture.get(), is(jobId));
     }
 
     /**
@@ -481,32 +462,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
             finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner());
         }
 
-        assertThatHABlobsHaveBeenRemoved();
-    }
-
-    @Test
-    public void testHaDataCleanupWhenJobFinished() throws Exception {
-        TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
-        TestingJobManagerRunner jobManagerRunner =
-                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        finishJob(jobManagerRunner);
-        JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS);
-        assertThat(jobID, is(this.jobId));
-    }
-
-    @Test
-    public void testHaDataCleanupWhenJobNotFinished() throws Exception {
-        TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
-        TestingJobManagerRunner jobManagerRunner =
-                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        suspendJob(jobManagerRunner);
-        try {
-            cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS);
-            fail("We should not delete the HA data for job.");
-        } catch (TimeoutException ignored) {
-            // expected
-        }
-        assertThat(cleanupJobHADataFuture.isDone(), is(false));
+        assertGlobalCleanupTriggered(jobId);
     }
 
     private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
@@ -517,6 +473,10 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.SUSPENDED);
     }
 
+    private void cancelJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
+        terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.CANCELED);
+    }
+
     private void terminateJobWithState(
             TestingJobManagerRunner takeCreatedJobManagerRunner, JobStatus state) {
         takeCreatedJobManagerRunner.completeResultFuture(
@@ -530,8 +490,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
     private void assertThatNoCleanupWasTriggered() {
         assertThat(globalCleanupFuture.isDone(), is(false));
         assertThat(localCleanupFuture.isDone(), is(false));
-        assertThat(deleteAllHABlobsFuture.isDone(), is(false));
-        assertThat(blobFile.exists(), is(true));
     }
 
     @Test
@@ -566,84 +524,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         dispatcherTerminationFuture.get();
     }
 
-    private static final class SingleJobResultStore implements JobResultStore {
-
-        private final JobID expectedJobId;
-        @Nullable private JobResultEntry actualJobResultEntry;
-        private boolean isDirty = true;
-
-        private final OneShotLatch clearedJobLatch;
-
-        private SingleJobResultStore(JobID expectedJobId, OneShotLatch clearedJobLatch) {
-            this.expectedJobId = expectedJobId;
-            this.clearedJobLatch = clearedJobLatch;
-        }
-
-        @Override
-        public void createDirtyResult(JobResultEntry jobResultEntry) throws IOException {
-            checkJobId(jobResultEntry.getJobId());
-            this.actualJobResultEntry = jobResultEntry;
-        }
-
-        private void checkJobId(JobID jobID) {
-            Preconditions.checkArgument(expectedJobId.equals(jobID));
-        }
-
-        @Override
-        public void markResultAsClean(JobID jobId) throws IOException {
-            checkJobId(jobId);
-            Preconditions.checkNotNull(actualJobResultEntry);
-            isDirty = false;
-            clearedJobLatch.trigger();
-        }
-
-        @Override
-        public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
-            if (actualJobResultEntry == null) {
-                return false;
-            }
-
-            checkJobId(jobId);
-            return isDirty;
-        }
-
-        @Override
-        public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
-            if (actualJobResultEntry == null) {
-                return false;
-            }
-
-            checkJobId(jobId);
-            return !isDirty;
-        }
-
-        @Override
-        public Set<JobResult> getDirtyResults() throws IOException {
-            return actualJobResultEntry != null && isDirty
-                    ? Collections.singleton(actualJobResultEntry.getJobResult())
-                    : Collections.emptySet();
-        }
-    }
-
-    @Test
-    public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception {
-        final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
-                startDispatcherAndSubmitJob();
-
-        ArchivedExecutionGraph executionGraph =
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobId)
-                        .setState(JobStatus.CANCELED)
-                        .build();
-
-        final TestingJobManagerRunner testingJobManagerRunner =
-                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
-
-        assertGlobalCleanupTriggered(jobId);
-        assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
-    }
-
     private void assertLocalCleanupTriggered(JobID jobId)
             throws ExecutionException, InterruptedException, TimeoutException {
         assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId));
@@ -658,17 +538,17 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
     @Test
     public void testFatalErrorIfJobCannotBeMarkedDirtyInJobResultStore() throws Exception {
-        jobResultStore =
+        final JobResultStore jobResultStore =
                 TestingJobResultStore.builder()
                         .withCreateDirtyResultConsumer(
                                 jobResult -> {
                                     throw new IOException("Expected IOException.");
                                 })
                         .build();
-        highAvailabilityServices.setJobResultStore(jobResultStore);
 
         final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
-                startDispatcherAndSubmitJob();
+                startDispatcherAndSubmitJob(
+                        createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0);
 
         ArchivedExecutionGraph executionGraph =
                 new ArchivedExecutionGraphBuilder()
@@ -691,7 +571,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
     @Test
     public void testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws Exception {
         final CompletableFuture<JobResultEntry> dirtyJobFuture = new CompletableFuture<>();
-        jobResultStore =
+        final JobResultStore jobResultStore =
                 TestingJobResultStore.builder()
                         .withCreateDirtyResultConsumer(dirtyJobFuture::complete)
                         .withMarkResultAsCleanConsumer(
@@ -699,10 +579,10 @@ public class DispatcherResourceCleanupTest extends TestLogger {
                                     throw new IOException("Expected IOException.");
                                 })
                         .build();
-        highAvailabilityServices.setJobResultStore(jobResultStore);
 
         final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
-                startDispatcherAndSubmitJob();
+                startDispatcherAndSubmitJob(
+                        createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0);
 
         ArchivedExecutionGraph executionGraph =
                 new ArchivedExecutionGraphBuilder()
@@ -729,45 +609,108 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         assertThat(dirtyJobFuture.get().getJobId(), is(jobId));
     }
 
-    private static final class TestingBlobServer extends BlobServer {
-
-        private final BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction;
-        private final BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction;
-
-        /**
-         * Instantiates a new BLOB server and binds it to a free network port.
-         *
-         * @param config Configuration to be used to instantiate the BlobServer
-         * @param blobStore BlobStore to store blobs persistently
-         * @param globalCleanupFunction The function called along the actual {@link
-         *     #globalCleanupAsync(JobID, Executor)} call.
-         * @param localCleanupFunction The function called along the actual {@link
-         *     #localCleanupAsync(JobID, Executor)} call.
-         * @throws IOException thrown if the BLOB server cannot bind to a free network port or if
-         *     the (local or distributed) file storage cannot be created or is not usable
-         */
-        public TestingBlobServer(
-                Configuration config,
-                File storageDirectory,
-                BlobStore blobStore,
-                BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction,
-                BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction)
-                throws IOException {
-            super(config, storageDirectory, blobStore);
-            this.globalCleanupFunction = globalCleanupFunction;
-            this.localCleanupFunction = localCleanupFunction;
+    /** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */
+    @Test
+    public void testFailingJobManagerRunnerCleanup() throws Exception {
+        final FlinkException testException = new FlinkException("Test exception.");
+        final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
+
+        final BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory =
+                new BlockingJobManagerRunnerFactory(
+                        () -> {
+                            final Optional<Exception> maybeException = queue.take();
+                            if (maybeException.isPresent()) {
+                                throw maybeException.get();
+                            }
+                        });
+
+        startDispatcher(blockingJobManagerRunnerFactory);
+
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        // submit and fail during job master runner construction
+        queue.offer(Optional.of(testException));
+        try {
+            dispatcherGateway.submitJob(jobGraph, Time.minutes(1)).get();
+            fail("A FlinkException is expected");
+        } catch (Throwable expectedException) {
+            assertThat(expectedException, containsCause(FlinkException.class));
+            assertThat(expectedException, containsMessage(testException.getMessage()));
+            // make sure we've cleaned up in correct order (including HA)
+            assertGlobalCleanupTriggered(jobId);
         }
 
-        @Override
-        public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
-            return super.globalCleanupAsync(jobId, executor)
-                    .thenCompose(ignored -> globalCleanupFunction.apply(jobId, executor));
+        // don't fail this time
+        queue.offer(Optional.empty());
+        // submit job again
+        dispatcherGateway.submitJob(jobGraph, Time.minutes(1L)).get();
+        blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING);
+
+        // Ensure job is running
+        awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+    }
+
+    private static final class BlockingJobManagerRunnerFactory
+            extends TestingJobManagerRunnerFactory {
+
+        private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
+        private TestingJobManagerRunner testingRunner;
+
+        BlockingJobManagerRunnerFactory(ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+            this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
         }
 
         @Override
-        public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) {
-            return super.localCleanupAsync(jobId, executor)
-                    .thenCompose(ignored -> localCleanupFunction.apply(jobId, executor));
+        public TestingJobManagerRunner createJobManagerRunner(
+                JobGraph jobGraph,
+                Configuration configuration,
+                RpcService rpcService,
+                HighAvailabilityServices highAvailabilityServices,
+                HeartbeatServices heartbeatServices,
+                JobManagerSharedServices jobManagerSharedServices,
+                JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+                FatalErrorHandler fatalErrorHandler,
+                long initializationTimestamp)
+                throws Exception {
+            jobManagerRunnerCreationLatch.run();
+
+            this.testingRunner =
+                    super.createJobManagerRunner(
+                            jobGraph,
+                            configuration,
+                            rpcService,
+                            highAvailabilityServices,
+                            heartbeatServices,
+                            jobManagerSharedServices,
+                            jobManagerJobMetricGroupFactory,
+                            fatalErrorHandler,
+                            initializationTimestamp);
+
+            TestingJobMasterGateway testingJobMasterGateway =
+                    new TestingJobMasterGatewayBuilder()
+                            .setRequestJobSupplier(
+                                    () ->
+                                            CompletableFuture.completedFuture(
+                                                    new ExecutionGraphInfo(
+                                                            ArchivedExecutionGraph
+                                                                    .createFromInitializingJob(
+                                                                            jobGraph.getJobID(),
+                                                                            jobGraph.getName(),
+                                                                            JobStatus.RUNNING,
+                                                                            null,
+                                                                            null,
+                                                                            1337))))
+                            .build();
+            testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
+            return testingRunner;
+        }
+
+        public void setJobStatus(JobStatus newStatus) {
+            Preconditions.checkState(
+                    testingRunner != null,
+                    "JobManagerRunner must be created before this method is available");
+            this.testingRunner.setJobStatus(newStatus);
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 6f4898a..bad61cd 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -59,7 +59,6 @@ import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProce
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
 import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
-import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -89,7 +88,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TimeUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.assertj.core.api.Assertions;
 import org.hamcrest.Matchers;
@@ -110,11 +108,9 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Optional;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -127,12 +123,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -178,8 +172,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
             JobManagerRunnerFactory jobManagerRunnerFactory)
             throws Exception {
         final TestingDispatcher dispatcher =
-                new TestingDispatcherBuilder()
-                        .setHaServices(haServices)
+                createTestingDispatcherBuilder()
+                        .setHighAvailabilityServices(haServices)
                         .setHeartbeatServices(heartbeatServices)
                         .setJobManagerRunnerFactory(jobManagerRunnerFactory)
                         .setJobGraphWriter(haServices.getJobGraphStore())
@@ -246,11 +240,11 @@ public class DispatcherTest extends AbstractDispatcherTest {
     @Test
     public void testDuplicateJobSubmissionWithRunningJobId() throws Exception {
         dispatcher =
-                new TestingDispatcherBuilder()
+                createTestingDispatcherBuilder()
                         .setJobManagerRunnerFactory(
                                 new ExpectedJobIdJobManagerRunnerFactory(
                                         jobId, createdJobManagerRunnerLatch))
-                        .setInitialJobGraphs(Collections.singleton(jobGraph))
+                        .setRecoveredJobs(Collections.singleton(jobGraph))
                         .build();
         dispatcher.start();
         final DispatcherGateway dispatcherGateway =
@@ -474,7 +468,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
         final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
                 new CompletableFuture<>();
         dispatcher =
-                new TestingDispatcherBuilder()
+                createTestingDispatcherBuilder()
                         .setJobManagerRunnerFactory(
                                 new FinishingJobManagerRunnerFactory(
                                         jobTerminationFuture, () -> {}))
@@ -675,10 +669,9 @@ public class DispatcherTest extends AbstractDispatcherTest {
         final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
                 new TestingJobManagerRunnerFactory();
         dispatcher =
-                new TestingDispatcherBuilder()
+                createTestingDispatcherBuilder()
                         .setJobManagerRunnerFactory(jobManagerRunnerFactory)
-                        .setInitialJobGraphs(
-                                Collections.singleton(JobGraphTestUtils.emptyJobGraph()))
+                        .setRecoveredJobs(Collections.singleton(JobGraphTestUtils.emptyJobGraph()))
                         .build();
 
         dispatcher.start();
@@ -721,89 +714,12 @@ public class DispatcherTest extends AbstractDispatcherTest {
         final JobResult jobResult =
                 TestingJobResultStore.createSuccessfulJobResult(jobGraph.getJobID());
         dispatcher =
-                new TestingDispatcherBuilder()
-                        .setInitialJobGraphs(Collections.singleton(jobGraph))
-                        .setDirtyJobResults(Collections.singleton(jobResult))
+                createTestingDispatcherBuilder()
+                        .setRecoveredJobs(Collections.singleton(jobGraph))
+                        .setRecoveredDirtyJobs(Collections.singleton(jobResult))
                         .build();
     }
 
-    /** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */
-    @Test
-    public void testFailingJobManagerRunnerCleanup() throws Exception {
-        final FlinkException testException = new FlinkException("Test exception.");
-        final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
-
-        final BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory =
-                new BlockingJobManagerRunnerFactory(
-                        () -> {
-                            final Optional<Exception> maybeException = queue.take();
-                            if (maybeException.isPresent()) {
-                                throw maybeException.get();
-                            }
-                        });
-
-        final BlockingQueue<String> cleanUpEvents = new LinkedBlockingQueue<>();
-
-        // Track cleanup - ha-services
-        final CompletableFuture<JobID> cleanupJobData = new CompletableFuture<>();
-        haServices.setGlobalCleanupFuture(cleanupJobData);
-        cleanupJobData.thenAccept(jobId -> cleanUpEvents.add(CLEANUP_HA_SERVICES));
-
-        // Track cleanup - job-graph
-        final TestingJobGraphStore jobGraphStore =
-                TestingJobGraphStore.newBuilder()
-                        .setLocalCleanupFunction(
-                                (jobId, executor) -> {
-                                    cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE);
-                                    return FutureUtils.completedVoidFuture();
-                                })
-                        .setGlobalCleanupFunction(
-                                (jobId, executor) -> {
-                                    cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE);
-                                    return FutureUtils.completedVoidFuture();
-                                })
-                        .build();
-        jobGraphStore.start(null);
-        haServices.setJobGraphStore(jobGraphStore);
-
-        // Track cleanup - job result store
-        haServices.setJobResultStore(
-                TestingJobResultStore.builder()
-                        .withMarkResultAsCleanConsumer(
-                                jobID -> cleanUpEvents.add(CLEANUP_JOB_RESULT_STORE))
-                        .build());
-
-        dispatcher =
-                createAndStartDispatcher(
-                        heartbeatServices, haServices, blockingJobManagerRunnerFactory);
-
-        final DispatcherGateway dispatcherGateway =
-                dispatcher.getSelfGateway(DispatcherGateway.class);
-
-        // submit and fail during job master runner construction
-        queue.offer(Optional.of(testException));
-        try {
-            dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
-            fail("A FlinkException is expected");
-        } catch (Throwable expectedException) {
-            assertThat(expectedException, containsCause(FlinkException.class));
-            assertThat(expectedException, containsMessage(testException.getMessage()));
-            // make sure we've cleaned up in correct order (including HA)
-            assertThat(
-                    new ArrayList<>(cleanUpEvents),
-                    containsInAnyOrder(CLEANUP_JOB_GRAPH_REMOVE, CLEANUP_HA_SERVICES));
-        }
-
-        // don't fail this time
-        queue.offer(Optional.empty());
-        // submit job again
-        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
-        blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING);
-
-        // Ensure job is running
-        awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
-    }
-
     @Test
     public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
         final TestingJobGraphStore submittedJobGraphStore =
@@ -812,7 +728,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
         haServices.setJobGraphStore(submittedJobGraphStore);
 
         dispatcher =
-                new TestingDispatcherBuilder().setJobGraphWriter(submittedJobGraphStore).build();
+                createTestingDispatcherBuilder().setJobGraphWriter(submittedJobGraphStore).build();
 
         dispatcher.start();
 
@@ -930,8 +846,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
         testingJobGraphStore.start(null);
 
         dispatcher =
-                new TestingDispatcherBuilder()
-                        .setInitialJobGraphs(Collections.singleton(jobGraph))
+                createTestingDispatcherBuilder()
+                        .setRecoveredJobs(Collections.singleton(jobGraph))
                         .setJobGraphWriter(testingJobGraphStore)
                         .build();
         dispatcher.start();
@@ -1110,8 +1026,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
         final PermanentBlobKey blobKey2 = blobServer.putPermanent(jobId2, fileContent);
 
         dispatcher =
-                new TestingDispatcherBuilder()
-                        .setInitialJobGraphs(Collections.singleton(new JobGraph(jobId1, "foobar")))
+                createTestingDispatcherBuilder()
+                        .setRecoveredJobs(Collections.singleton(new JobGraph(jobId1, "foobar")))
                         .build();
 
         Assertions.assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);
@@ -1365,70 +1281,6 @@ public class DispatcherTest extends AbstractDispatcherTest {
         }
     }
 
-    private static final class BlockingJobManagerRunnerFactory
-            extends TestingJobManagerRunnerFactory {
-
-        @Nonnull private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
-        private TestingJobManagerRunner testingRunner;
-
-        BlockingJobManagerRunnerFactory(
-                @Nonnull ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
-            this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
-        }
-
-        @Override
-        public TestingJobManagerRunner createJobManagerRunner(
-                JobGraph jobGraph,
-                Configuration configuration,
-                RpcService rpcService,
-                HighAvailabilityServices highAvailabilityServices,
-                HeartbeatServices heartbeatServices,
-                JobManagerSharedServices jobManagerSharedServices,
-                JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-                FatalErrorHandler fatalErrorHandler,
-                long initializationTimestamp)
-                throws Exception {
-            jobManagerRunnerCreationLatch.run();
-
-            this.testingRunner =
-                    super.createJobManagerRunner(
-                            jobGraph,
-                            configuration,
-                            rpcService,
-                            highAvailabilityServices,
-                            heartbeatServices,
-                            jobManagerSharedServices,
-                            jobManagerJobMetricGroupFactory,
-                            fatalErrorHandler,
-                            initializationTimestamp);
-
-            TestingJobMasterGateway testingJobMasterGateway =
-                    new TestingJobMasterGatewayBuilder()
-                            .setRequestJobSupplier(
-                                    () ->
-                                            CompletableFuture.completedFuture(
-                                                    new ExecutionGraphInfo(
-                                                            ArchivedExecutionGraph
-                                                                    .createFromInitializingJob(
-                                                                            jobGraph.getJobID(),
-                                                                            jobGraph.getName(),
-                                                                            JobStatus.RUNNING,
-                                                                            null,
-                                                                            null,
-                                                                            1337))))
-                            .build();
-            testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
-            return testingRunner;
-        }
-
-        public void setJobStatus(JobStatus newStatus) {
-            Preconditions.checkState(
-                    testingRunner != null,
-                    "JobManagerRunner must be created before this method is available");
-            this.testingRunner.setJobStatus(newStatus);
-        }
-    }
-
     private static final class InitializationTimestampCapturingJobManagerRunnerFactory
             implements JobManagerRunnerFactory {
         private final BlockingQueue<Long> initializationTimestampQueue;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index ba5ecca..5c3efc7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -20,16 +20,39 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
+import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
 import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.function.Function;
 
 /** {@link Dispatcher} implementation used for testing purposes. */
@@ -56,6 +79,58 @@ class TestingDispatcher extends Dispatcher {
         this.startFuture = new CompletableFuture<>();
     }
 
+    private TestingDispatcher(
+            RpcService rpcService,
+            DispatcherId fencingToken,
+            Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> recoveredDirtyJobs,
+            Configuration configuration,
+            HighAvailabilityServices highAvailabilityServices,
+            GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+            HeartbeatServices heartbeatServices,
+            BlobServer blobServer,
+            FatalErrorHandler fatalErrorHandler,
+            JobGraphWriter jobGraphWriter,
+            JobResultStore jobResultStore,
+            JobManagerMetricGroup jobManagerMetricGroup,
+            @Nullable String metricServiceQueryAddress,
+            Executor ioExecutor,
+            HistoryServerArchivist historyServerArchivist,
+            ExecutionGraphInfoStore executionGraphInfoStore,
+            JobManagerRunnerFactory jobManagerRunnerFactory,
+            DispatcherBootstrapFactory dispatcherBootstrapFactory,
+            DispatcherOperationCaches dispatcherOperationCaches,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry,
+            ResourceCleanerFactory resourceCleanerFactory)
+            throws Exception {
+        super(
+                rpcService,
+                fencingToken,
+                recoveredJobs,
+                recoveredDirtyJobs,
+                dispatcherBootstrapFactory,
+                new DispatcherServices(
+                        configuration,
+                        highAvailabilityServices,
+                        resourceManagerGatewayRetriever,
+                        blobServer,
+                        heartbeatServices,
+                        executionGraphInfoStore,
+                        fatalErrorHandler,
+                        historyServerArchivist,
+                        metricServiceQueryAddress,
+                        dispatcherOperationCaches,
+                        jobManagerMetricGroup,
+                        jobGraphWriter,
+                        jobResultStore,
+                        jobManagerRunnerFactory,
+                        ioExecutor),
+                jobManagerRunnerRegistry,
+                resourceCleanerFactory);
+
+        this.startFuture = new CompletableFuture<>();
+    }
+
     @Override
     public void onStart() throws Exception {
         try {
@@ -91,4 +166,210 @@ class TestingDispatcher extends Dispatcher {
     void waitUntilStarted() {
         startFuture.join();
     }
+
+    public static TestingDispatcher.Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private RpcService rpcService = new TestingRpcService();
+        private DispatcherId fencingToken = DispatcherId.generate();
+        private Collection<JobGraph> recoveredJobs = Collections.emptyList();
+        private Collection<JobResult> recoveredDirtyJobs = Collections.emptyList();
+        private HighAvailabilityServices highAvailabilityServices =
+                new TestingHighAvailabilityServices();
+
+        private TestingResourceManagerGateway resourceManagerGateway =
+                new TestingResourceManagerGateway();
+        private GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
+                () -> CompletableFuture.completedFuture(resourceManagerGateway);
+        private HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+
+        private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+        private JobResultStore jobResultStore = new EmbeddedJobResultStore();
+
+        private Configuration configuration = new Configuration();
+
+        // even-though it's labeled as @Nullable, it's a mandatory field that needs to be set before
+        // building the Dispatcher instance
+        @Nullable private BlobServer blobServer = null;
+        private FatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+        private JobManagerMetricGroup jobManagerMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
+        @Nullable private String metricServiceQueryAddress = null;
+        private Executor ioExecutor = ForkJoinPool.commonPool();
+        private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;
+        private ExecutionGraphInfoStore executionGraphInfoStore =
+                new MemoryExecutionGraphInfoStore();
+        private JobManagerRunnerFactory jobManagerRunnerFactory =
+                new TestingJobManagerRunnerFactory(0);
+        private DispatcherBootstrapFactory dispatcherBootstrapFactory =
+                (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap();
+        private DispatcherOperationCaches dispatcherOperationCaches =
+                new DispatcherOperationCaches();
+        private JobManagerRunnerRegistry jobManagerRunnerRegistry =
+                new DefaultJobManagerRunnerRegistry(1);
+        @Nullable private ResourceCleanerFactory resourceCleanerFactory;
+
+        public Builder setRpcService(RpcService rpcService) {
+            this.rpcService = rpcService;
+            return this;
+        }
+
+        public Builder setFencingToken(DispatcherId fencingToken) {
+            this.fencingToken = fencingToken;
+            return this;
+        }
+
+        public Builder setRecoveredJobs(Collection<JobGraph> recoveredJobs) {
+            this.recoveredJobs = recoveredJobs;
+            return this;
+        }
+
+        public Builder setRecoveredDirtyJobs(Collection<JobResult> recoveredDirtyJobs) {
+            this.recoveredDirtyJobs = recoveredDirtyJobs;
+            return this;
+        }
+
+        public Builder setHighAvailabilityServices(
+                HighAvailabilityServices highAvailabilityServices) {
+            this.highAvailabilityServices = highAvailabilityServices;
+            return this;
+        }
+
+        public Builder setResourceManagerGateway(
+                TestingResourceManagerGateway resourceManagerGateway) {
+            this.resourceManagerGateway = resourceManagerGateway;
+            return this;
+        }
+
+        public Builder setResourceManagerGatewayRetriever(
+                GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
+            this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
+            return this;
+        }
+
+        public Builder setHeartbeatServices(HeartbeatServices heartbeatServices) {
+            this.heartbeatServices = heartbeatServices;
+            return this;
+        }
+
+        public Builder setJobGraphWriter(JobGraphWriter jobGraphWriter) {
+            this.jobGraphWriter = jobGraphWriter;
+            return this;
+        }
+
+        public Builder setJobResultStore(JobResultStore jobResultStore) {
+            this.jobResultStore = jobResultStore;
+            return this;
+        }
+
+        public Builder setConfiguration(Configuration configuration) {
+            this.configuration = configuration;
+            return this;
+        }
+
+        public Builder setBlobServer(BlobServer blobServer) {
+            this.blobServer = blobServer;
+            return this;
+        }
+
+        public Builder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) {
+            this.fatalErrorHandler = fatalErrorHandler;
+            return this;
+        }
+
+        public Builder setJobManagerMetricGroup(JobManagerMetricGroup jobManagerMetricGroup) {
+            this.jobManagerMetricGroup = jobManagerMetricGroup;
+            return this;
+        }
+
+        public Builder setMetricServiceQueryAddress(@Nullable String metricServiceQueryAddress) {
+            this.metricServiceQueryAddress = metricServiceQueryAddress;
+            return this;
+        }
+
+        public Builder setIoExecutor(Executor ioExecutor) {
+            this.ioExecutor = ioExecutor;
+            return this;
+        }
+
+        public Builder setHistoryServerArchivist(HistoryServerArchivist historyServerArchivist) {
+            this.historyServerArchivist = historyServerArchivist;
+            return this;
+        }
+
+        public Builder setExecutionGraphInfoStore(ExecutionGraphInfoStore executionGraphInfoStore) {
+            this.executionGraphInfoStore = executionGraphInfoStore;
+            return this;
+        }
+
+        public Builder setJobManagerRunnerFactory(JobManagerRunnerFactory jobManagerRunnerFactory) {
+            this.jobManagerRunnerFactory = jobManagerRunnerFactory;
+            return this;
+        }
+
+        public Builder setDispatcherBootstrapFactory(
+                DispatcherBootstrapFactory dispatcherBootstrapFactory) {
+            this.dispatcherBootstrapFactory = dispatcherBootstrapFactory;
+            return this;
+        }
+
+        public Builder setDispatcherOperationCaches(
+                DispatcherOperationCaches dispatcherOperationCaches) {
+            this.dispatcherOperationCaches = dispatcherOperationCaches;
+            return this;
+        }
+
+        public Builder setJobManagerRunnerRegistry(
+                JobManagerRunnerRegistry jobManagerRunnerRegistry) {
+            this.jobManagerRunnerRegistry = jobManagerRunnerRegistry;
+            return this;
+        }
+
+        public Builder setResourceCleanerFactory(ResourceCleanerFactory resourceCleanerFactory) {
+            this.resourceCleanerFactory = resourceCleanerFactory;
+            return this;
+        }
+
+        private ResourceCleanerFactory createDefaultResourceCleanerFactory() {
+            return new DispatcherResourceCleanerFactory(
+                    ioExecutor,
+                    jobManagerRunnerRegistry,
+                    jobGraphWriter,
+                    blobServer,
+                    highAvailabilityServices,
+                    jobManagerMetricGroup);
+        }
+
+        public TestingDispatcher build() throws Exception {
+            return new TestingDispatcher(
+                    rpcService,
+                    fencingToken,
+                    recoveredJobs,
+                    recoveredDirtyJobs,
+                    configuration,
+                    highAvailabilityServices,
+                    resourceManagerGatewayRetriever,
+                    heartbeatServices,
+                    Preconditions.checkNotNull(
+                            blobServer,
+                            "No BlobServer is specified for building the TestingDispatcher"),
+                    fatalErrorHandler,
+                    jobGraphWriter,
+                    jobResultStore,
+                    jobManagerMetricGroup,
+                    metricServiceQueryAddress,
+                    ioExecutor,
+                    historyServerArchivist,
+                    executionGraphInfoStore,
+                    jobManagerRunnerFactory,
+                    dispatcherBootstrapFactory,
+                    dispatcherOperationCaches,
+                    jobManagerRunnerRegistry,
+                    resourceCleanerFactory != null
+                            ? resourceCleanerFactory
+                            : createDefaultResourceCleanerFactory());
+        }
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java
new file mode 100644
index 0000000..2d54b8b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Executor;
+
+/** {@code TestingResourceCleanerFactory} for adding custom {@link ResourceCleaner} creation. */
+public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
+
+    private final Collection<LocallyCleanableResource> locallyCleanableResources;
+    private final Collection<GloballyCleanableResource> globallyCleanableResources;
+
+    private final Executor cleanupExecutor;
+
+    private TestingResourceCleanerFactory(
+            Collection<LocallyCleanableResource> locallyCleanableResources,
+            Collection<GloballyCleanableResource> globallyCleanableResources,
+            Executor cleanupExecutor) {
+        this.locallyCleanableResources = locallyCleanableResources;
+        this.globallyCleanableResources = globallyCleanableResources;
+        this.cleanupExecutor = cleanupExecutor;
+    }
+
+    @Override
+    public ResourceCleaner createLocalResourceCleaner(
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        return createResourceCleaner(
+                mainThreadExecutor,
+                locallyCleanableResources,
+                LocallyCleanableResource::localCleanupAsync);
+    }
+
+    @Override
+    public ResourceCleaner createGlobalResourceCleaner(
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        return createResourceCleaner(
+                mainThreadExecutor,
+                globallyCleanableResources,
+                GloballyCleanableResource::globalCleanupAsync);
+    }
+
+    private <T> ResourceCleaner createResourceCleaner(
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Collection<T> resources,
+            DefaultResourceCleaner.CleanupFn<T> cleanupFn) {
+        return jobId -> {
+            mainThreadExecutor.assertRunningInMainThread();
+            Throwable t = null;
+            for (T resource : resources) {
+                try {
+                    cleanupFn.cleanupAsync(resource, jobId, cleanupExecutor).get();
+                } catch (Throwable throwable) {
+                    t = ExceptionUtils.firstOrSuppressed(throwable, t);
+                }
+            }
+            return t != null
+                    ? FutureUtils.completedExceptionally(t)
+                    : FutureUtils.completedVoidFuture();
+        };
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** {@code Builder} for creating {@code TestingResourceCleanerFactory} instances. */
+    public static class Builder {
+
+        private Collection<LocallyCleanableResource> locallyCleanableResources = new ArrayList<>();
+        private Collection<GloballyCleanableResource> globallyCleanableResources =
+                new ArrayList<>();
+
+        private Executor cleanupExecutor = Executors.directExecutor();
+
+        public Builder setLocallyCleanableResources(
+                Collection<LocallyCleanableResource> locallyCleanableResources) {
+            this.locallyCleanableResources = locallyCleanableResources;
+            return this;
+        }
+
+        public Builder withLocallyCleanableResource(
+                LocallyCleanableResource locallyCleanableResource) {
+            this.locallyCleanableResources.add(locallyCleanableResource);
+            return this;
+        }
+
+        public Builder setGloballyCleanableResources(
+                Collection<GloballyCleanableResource> globallyCleanableResources) {
+            this.globallyCleanableResources = globallyCleanableResources;
+            return this;
+        }
+
+        public Builder withGloballyCleanableResource(
+                GloballyCleanableResource globallyCleanableResource) {
+            this.globallyCleanableResources.add(globallyCleanableResource);
+            return this;
+        }
+
+        public Builder setCleanupExecutor(Executor cleanupExecutor) {
+            this.cleanupExecutor = cleanupExecutor;
+            return this;
+        }
+
+        public TestingResourceCleanerFactory build() {
+            return new TestingResourceCleanerFactory(
+                    locallyCleanableResources, globallyCleanableResources, cleanupExecutor);
+        }
+    }
+}