You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/16 18:25:13 UTC

[flink] 01/06: [FLINK-26652][runtime] Makes the cleanup not fail fatally

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfdc36c9223a922834c3bae403fa983a218b0ad0
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Mar 15 15:59:08 2022 +0100

    [FLINK-26652][runtime] Makes the cleanup not fail fatally
    
    The user explicitly marked the cleanup retry logic to
    terminate after a certain amount of attempts. This should be
    considered as desired behavior and shouldn't make the cluster
    fail fatally.
---
 docs/content.zh/docs/deployment/ha/overview.md     |  20 ++--
 docs/content.zh/docs/deployment/overview.md        |   5 +
 docs/content/docs/deployment/ha/overview.md        |  20 ++--
 docs/content/docs/deployment/overview.md           |   7 +-
 .../generated/cleanup_configuration.html           |   2 +-
 ...ntial_delay_cleanup_strategy_configuration.html |   2 +-
 ...fixed_delay_cleanup_strategy_configuration.html |   2 +-
 .../apache/flink/configuration/CleanupOptions.java |  24 +++--
 .../flink/runtime/dispatcher/Dispatcher.java       |  19 +++-
 .../dispatcher/DispatcherCleanupITCase.java        | 115 ++++++++++-----------
 10 files changed, 126 insertions(+), 90 deletions(-)

diff --git a/docs/content.zh/docs/deployment/ha/overview.md b/docs/content.zh/docs/deployment/ha/overview.md
index c7e516b..0cfcd30 100644
--- a/docs/content.zh/docs/deployment/ha/overview.md
+++ b/docs/content.zh/docs/deployment/ha/overview.md
@@ -76,14 +76,16 @@ Flink 提供了两种高可用服务实现:
 
 ## JobResultStore
 
-In order to preserve a job's scheduling status across failover events and prevent erroneous
-re-execution of globally terminated (i.e. finished, cancelled or failed) jobs, Flink persists
-status of terminated jobs to a filesystem using the JobResultStore.
-The JobResultStore allows job results to outlive a finished job, and can be used by
-Flink components involved in the recovery of a highly-available cluster in order to
-determine whether a job should be subject to recovery.
-
-The JobResultStore has sensible defaults for its behaviour, such as result storage
-location, but these can be [configured]({{< ref "docs/deployment/config#high-availability" >}}).
+The JobResultStore is used to archive the final result of a job that reached a globally-terminal
+state (i.e. finished, cancelled or failed). The data is stored on a file system (see
+[job-result-store.storage-path]({{< ref "docs/deployment/config#job-result-store-storage-path" >}})).
+Entries in this store are marked as dirty as long as the corresponding job wasn't cleaned up properly
+(artifacts are found in the job's subfolder in [high-availability.storageDir]({{< ref "docs/deployment/config#high-availability-storagedir" >}})).
+
+Dirty entries are subject to cleanup, i.e. the corresponding job is either cleaned up by Flink at
+the moment or will be picked up for cleanup as part of a recovery. The entries will be deleted as
+soon as the cleanup succeeds. Check the JobResultStore configuration parameters under
+[HA configuration options]({{< ref "docs/deployment/config#high-availability" >}}) for further
+details on how to adapt the behavior.
 
 {{< top >}}
diff --git a/docs/content.zh/docs/deployment/overview.md b/docs/content.zh/docs/deployment/overview.md
index df6abea..f10dab3 100644
--- a/docs/content.zh/docs/deployment/overview.md
+++ b/docs/content.zh/docs/deployment/overview.md
@@ -158,6 +158,11 @@ Once a job has reached a globally terminal state of either finished, failed or c
 external component resources associated with the job are then cleaned up. In the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. You can
 [configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used.
+Reaching the maximum number of retries without succeeding will leave the job in a dirty state.
+Its artifacts would need to be cleaned up manually (see the
+[High Availability Services / JobResultStore]({{< ref "docs/deployment/ha/overview#jobresultstore" >}})
+section for further details). Restarting the very same job (i.e. using the same
+job ID) will result in the cleanup being restarted without running the job again.
 
 There is currently an issue with the cleanup of CompletedCheckpoints that failed to be deleted
 while subsuming them as part of the usual CompletedCheckpoint management. These artifacts are
diff --git a/docs/content/docs/deployment/ha/overview.md b/docs/content/docs/deployment/ha/overview.md
index 36cdcb4..1939474 100644
--- a/docs/content/docs/deployment/ha/overview.md
+++ b/docs/content/docs/deployment/ha/overview.md
@@ -82,14 +82,16 @@ Once this happens, all the HA data, including the metadata stored in the HA serv
 
 ## JobResultStore
 
-In order to preserve a job's scheduling status across failover events and prevent erroneous
-re-execution of globally terminated (i.e. finished, cancelled or failed) jobs, Flink persists
-status of terminated jobs to a filesystem using the JobResultStore.
-The JobResultStore allows job results to outlive a finished job, and can be used by
-Flink components involved in the recovery of a highly-available cluster in order to
-determine whether a job should be subject to recovery.
-
-The JobResultStore has sensible defaults for its behaviour, such as result storage
-location, but these can be [configured]({{< ref "docs/deployment/config#high-availability" >}}).
+The JobResultStore is used to archive the final result of a job that reached a globally-terminal 
+state (i.e. finished, cancelled or failed). The data is stored on a file system (see 
+[job-result-store.storage-path]({{< ref "docs/deployment/config#job-result-store-storage-path" >}})).
+Entries in this store are marked as dirty as long as the corresponding job wasn't cleaned up properly 
+(artifacts are found in the job's subfolder in [high-availability.storageDir]({{< ref "docs/deployment/config#high-availability-storagedir" >}})).
+
+Dirty entries are subject to cleanup, i.e. the corresponding job is either cleaned up by Flink at 
+the moment or will be picked up for cleanup as part of a recovery. The entries will be deleted as 
+soon as the cleanup succeeds. Check the JobResultStore configuration parameters under 
+[HA configuration options]({{< ref "docs/deployment/config#high-availability" >}}) for further 
+details on how to adapt the behavior.
 
 {{< top >}}
diff --git a/docs/content/docs/deployment/overview.md b/docs/content/docs/deployment/overview.md
index 64bac26..247605e 100644
--- a/docs/content/docs/deployment/overview.md
+++ b/docs/content/docs/deployment/overview.md
@@ -158,7 +158,12 @@ When deploying Flink, there are often multiple options available for each buildi
 Once a job has reached a globally terminal state of either finished, failed or cancelled, the
 external component resources associated with the job are then cleaned up. In the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. You can
-[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used.
+[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used. 
+Reaching the maximum number of retries without succeeding will leave the job in a dirty state. 
+Its artifacts would need to be cleaned up manually (see the 
+[High Availability Services / JobResultStore]({{< ref "docs/deployment/ha/overview#jobresultstore" >}})
+section for further details). Restarting the very same job (i.e. using the same 
+job ID) will result in the cleanup being restarted without running the job again.
 
 There is currently an issue with the cleanup of CompletedCheckpoints that failed to be deleted 
 while subsuming them as part of the usual CompletedCheckpoint management. These artifacts are 
diff --git a/docs/layouts/shortcodes/generated/cleanup_configuration.html b/docs/layouts/shortcodes/generated/cleanup_configuration.html
index 02e8618..e0663c3 100644
--- a/docs/layouts/shortcodes/generated/cleanup_configuration.html
+++ b/docs/layouts/shortcodes/generated/cleanup_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>cleanup-strategy</h5></td>
             <td style="word-wrap: break-word;">"exponential-delay"</td>
             <td>String</td>
-            <td>Defines the cleanup strategy to use in case of cleanup failures.<br />Accepted values are:<ul><li><code class="highlighter-rouge">none</code>, <code class="highlighter-rouge">disable</code>, <code class="highlighter-rouge">off</code>: Cleanup is only performed once. No retry will be initiated in case of failure.</li><li><code class="highlighter-rouge">fixed-delay</code>, <code class="highlighter-rouge">fixeddelay</code>: Cleanup attempts will be separated by a fixed inter [...]
+            <td>Defines the cleanup strategy to use in case of cleanup failures.<br />Accepted values are:<ul><li><code class="highlighter-rouge">none</code>, <code class="highlighter-rouge">disable</code>, <code class="highlighter-rouge">off</code>: Cleanup is only performed once. No retry will be initiated in case of failure. The job artifacts (and the job's JobResultStore entry) have to be cleaned up manually in case of a failure.</li><li><code class="highlighter-rouge">fixed-delay</c [...]
         </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html b/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
index f301253..ea2933b 100644
--- a/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
+++ b/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>cleanup-strategy.exponential-delay.attempts</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
-            <td>The number of times a failed cleanup is retried if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">exponential-delay</code>. (no value means: infinitely).</td>
+            <td>The number of times a failed cleanup is retried if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">exponential-delay</code>. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually. Setting no value corresponds to unlimited retries.</td>
         </tr>
         <tr>
             <td><h5>cleanup-strategy.exponential-delay.initial-backoff</h5></td>
diff --git a/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html b/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
index 6890b1b..6312784 100644
--- a/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
+++ b/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>cleanup-strategy.fixed-delay.attempts</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>The number of times that Flink retries the cleanup before giving up if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">fixed-delay</code>.</td>
+            <td>The number of times that Flink retries the cleanup before giving up if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">fixed-delay</code>. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually.</td>
         </tr>
         <tr>
             <td><h5>cleanup-strategy.fixed-delay.delay</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
index 49d5dac..31268ef 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
@@ -93,7 +93,9 @@ public class CleanupOptions {
                                                                             Collectors.joining(
                                                                                     ", "))
                                                             + ": Cleanup is only performed once. No retry "
-                                                            + "will be initiated in case of failure.",
+                                                            + "will be initiated in case of failure. The job "
+                                                            + "artifacts (and the job's JobResultStore entry) have "
+                                                            + "to be cleaned up manually in case of a failure.",
                                                     NONE_PARAM_VALUES.stream()
                                                             .map(TextElement::code)
                                                             .collect(Collectors.toList())
@@ -105,7 +107,9 @@ public class CleanupOptions {
                                                     "%s, %s: Cleanup attempts will be separated by a fixed "
                                                             + "interval up to the point where the cleanup is "
                                                             + "considered successful or a set amount of retries "
-                                                            + "is reached.",
+                                                            + "is reached. Reaching the configured limit means that "
+                                                            + "the job artifacts (and the job's JobResultStore entry) "
+                                                            + "might need to be cleaned up manually.",
                                                     code(FIXED_DELAY_LABEL),
                                                     code(
                                                             extractAlphaNumericCharacters(
@@ -115,7 +119,9 @@ public class CleanupOptions {
                                                             + "triggers the cleanup with an exponentially "
                                                             + "increasing delay up to the point where the "
                                                             + "cleanup succeeded or a set amount of retries "
-                                                            + "is reached.",
+                                                            + "is reached. Reaching the configured limit means that "
+                                                            + "the job artifacts (and the job's JobResultStore entry) "
+                                                            + "might need to be cleaned up manually.",
                                                     code(EXPONENTIAL_DELAY_LABEL),
                                                     code(
                                                             extractAlphaNumericCharacters(
@@ -133,7 +139,10 @@ public class CleanupOptions {
                             Description.builder()
                                     .text(
                                             "The number of times that Flink retries the cleanup "
-                                                    + "before giving up if %s has been set to %s.",
+                                                    + "before giving up if %s has been set to %s. "
+                                                    + "Reaching the configured limit means that "
+                                                    + "the job artifacts (and the job's JobResultStore entry) "
+                                                    + "might need to be cleaned up manually.",
                                             code(CLEANUP_STRATEGY_PARAM), code(FIXED_DELAY_LABEL))
                                     .build());
 
@@ -188,8 +197,11 @@ public class CleanupOptions {
                             Description.builder()
                                     .text(
                                             "The number of times a failed cleanup is retried "
-                                                    + "if %s has been set to %s. (no value means: "
-                                                    + "infinitely).",
+                                                    + "if %s has been set to %s. Reaching the "
+                                                    + "configured limit means that the job artifacts "
+                                                    + "(and the job's JobResultStore entry) "
+                                                    + "might need to be cleaned up manually. "
+                                                    + "Setting no value corresponds to unlimited retries.",
                                             code(CLEANUP_STRATEGY_PARAM),
                                             code(EXPONENTIAL_DELAY_LABEL))
                                     .build());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 393a429..68f387d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -44,6 +45,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.JobResultEntry;
 import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -615,7 +617,22 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
         final CompletableFuture<Void> jobTerminationFuture =
                 cleanupJobStateFuture.thenCompose(
-                        cleanupJobState -> removeJob(jobId, cleanupJobState));
+                        cleanupJobState ->
+                                removeJob(jobId, cleanupJobState)
+                                        .exceptionally(
+                                                throwable -> {
+                                                    log.warn(
+                                                            "The cleanup of job {} failed. The job's artifacts in '{}' and its JobResultStore entry in '{}' needs to be cleaned manually.",
+                                                            jobId,
+                                                            configuration.get(
+                                                                    HighAvailabilityOptions
+                                                                            .HA_STORAGE_PATH),
+                                                            configuration.get(
+                                                                    JobResultStoreOptions
+                                                                            .STORAGE_PATH),
+                                                            throwable);
+                                                    return null;
+                                                }));
 
         FutureUtils.handleUncaughtException(
                 jobTerminationFuture,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 8cfb1ad..60312f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.CleanupOptions;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
@@ -49,7 +50,6 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TimeUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -63,11 +63,9 @@ import org.junit.Test;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -133,12 +131,23 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
         final int numberOfErrors = 5;
         final RuntimeException temporaryError =
                 new RuntimeException("Expected RuntimeException: Unable to remove job graph.");
+        final AtomicInteger failureCount = new AtomicInteger(numberOfErrors);
         final JobGraphStore jobGraphStore =
-                createAndStartJobGraphStoreWithCleanupFailures(
-                        numberOfErrors,
-                        temporaryError,
-                        actualGlobalCleanupCallCount,
-                        successfulCleanupLatch);
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (ignoredJobId, ignoredExecutor) -> {
+                                    actualGlobalCleanupCallCount.incrementAndGet();
+
+                                    if (failureCount.getAndDecrement() > 0) {
+                                        return FutureUtils.completedExceptionally(temporaryError);
+                                    }
+
+                                    successfulCleanupLatch.trigger();
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
         haServices.setJobGraphStore(jobGraphStore);
 
         // Construct leader election service.
@@ -249,11 +258,28 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
 
         // Construct job graph store.
         final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
-        final OneShotLatch successfulCleanupLatch = new OneShotLatch();
-        final RuntimeException temporaryError = new RuntimeException("Unable to remove job graph.");
+        final OneShotLatch firstCleanupTriggered = new OneShotLatch();
+        final CompletableFuture<JobID> successfulJobGraphCleanup = new CompletableFuture<>();
         final JobGraphStore jobGraphStore =
-                createAndStartJobGraphStoreWithCleanupFailures(
-                        1, temporaryError, actualGlobalCleanupCallCount, successfulCleanupLatch);
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (actualJobId, ignoredExecutor) -> {
+                                    final int callCount =
+                                            actualGlobalCleanupCallCount.getAndIncrement();
+                                    firstCleanupTriggered.trigger();
+
+                                    if (callCount < 1) {
+                                        return FutureUtils.completedExceptionally(
+                                                new RuntimeException(
+                                                        "Expected RuntimeException: Unable to remove job graph."));
+                                    }
+
+                                    successfulJobGraphCleanup.complete(actualJobId);
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
         haServices.setJobGraphStore(jobGraphStore);
 
         // Construct leader election service.
@@ -262,23 +288,10 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
         haServices.setJobMasterLeaderElectionService(jobId, leaderElectionService);
 
         // start the dispatcher with no retries on cleanup
-        final CountDownLatch jobGraphRemovalErrorReceived = new CountDownLatch(1);
-        final Dispatcher dispatcher =
-                createTestingDispatcherBuilder()
-                        .setFatalErrorHandler(
-                                throwable -> {
-                                    final Optional<Throwable> maybeError =
-                                            ExceptionUtils.findThrowable(
-                                                    throwable, temporaryError::equals);
-                                    if (maybeError.isPresent()) {
-                                        jobGraphRemovalErrorReceived.countDown();
-                                    } else {
-                                        testingFatalErrorHandlerResource
-                                                .getFatalErrorHandler()
-                                                .onFatalError(throwable);
-                                    }
-                                })
-                        .build();
+        configuration.set(
+                CleanupOptions.CLEANUP_STRATEGY,
+                CleanupOptions.NONE_PARAM_VALUES.iterator().next());
+        final Dispatcher dispatcher = createTestingDispatcherBuilder().build();
         dispatcher.start();
 
         toTerminate.add(dispatcher);
@@ -288,7 +301,7 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
         dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
         waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
-        jobGraphRemovalErrorReceived.await();
+        firstCleanupTriggered.await();
 
         // Remove job master leadership.
         leaderElectionService.notLeader();
@@ -296,18 +309,25 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
         // This will clear internal state of election service, so a new contender can register.
         leaderElectionService.stop();
 
-        assertThat(successfulCleanupLatch.isTriggered(), CoreMatchers.is(false));
+        assertThat(
+                "The cleanup should have been triggered only once.",
+                actualGlobalCleanupCallCount.get(),
+                equalTo(1));
+        assertThat(
+                "The cleanup should not have reached the successful cleanup code path.",
+                successfulJobGraphCleanup.isDone(),
+                equalTo(false));
 
         assertThat(
                 "The JobGraph is still stored in the JobGraphStore.",
                 haServices.getJobGraphStore().getJobIds(),
-                CoreMatchers.is(Collections.singleton(jobId)));
+                equalTo(Collections.singleton(jobId)));
         assertThat(
                 "The JobResultStore has this job marked as dirty.",
                 haServices.getJobResultStore().getDirtyResults().stream()
                         .map(JobResult::getJobId)
                         .collect(Collectors.toSet()),
-                CoreMatchers.is(Collections.singleton(jobId)));
+                equalTo(Collections.singleton(jobId)));
 
         // Run a second dispatcher, that restores our finished job.
         final Dispatcher secondDispatcher =
@@ -331,38 +351,11 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
                 "The JobResultStore has the job listed as clean.",
                 haServices.getJobResultStore().hasJobResultEntry(jobId));
 
-        // wait for the successful cleanup to be triggered
-        successfulCleanupLatch.await();
+        assertThat(successfulJobGraphCleanup.get(), equalTo(jobId));
 
         assertThat(actualGlobalCleanupCallCount.get(), equalTo(2));
     }
 
-    private JobGraphStore createAndStartJobGraphStoreWithCleanupFailures(
-            int numberOfCleanupFailures,
-            Throwable throwable,
-            AtomicInteger actualCleanupCallCount,
-            OneShotLatch successfulCleanupLatch)
-            throws Exception {
-        final AtomicInteger failureCount = new AtomicInteger(numberOfCleanupFailures);
-        final JobGraphStore jobGraphStore =
-                TestingJobGraphStore.newBuilder()
-                        .setGlobalCleanupFunction(
-                                (ignoredJobId, ignoredExecutor) -> {
-                                    actualCleanupCallCount.incrementAndGet();
-
-                                    if (failureCount.getAndDecrement() > 0) {
-                                        return FutureUtils.completedExceptionally(throwable);
-                                    }
-
-                                    successfulCleanupLatch.trigger();
-                                    return FutureUtils.completedVoidFuture();
-                                })
-                        .build();
-
-        jobGraphStore.start(null);
-        return jobGraphStore;
-    }
-
     private void waitForJobToFinish(
             TestingLeaderElectionService leaderElectionService,
             DispatcherGateway dispatcherGateway,