You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:22 UTC

[flink] 04/10: [FLINK-25432][runtime] Adds generic interfaces for cleaning up Job-related data

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bf30a9b175648fea4dda7aab748a6b6d73dfba27
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Thu Feb 3 17:53:18 2022 +0100

    [FLINK-25432][runtime] Adds generic interfaces for cleaning up Job-related data
---
 .../dispatcher/cleanup/DefaultResourceCleaner.java | 145 +++++++++++++
 .../cleanup/GloballyCleanableResource.java         |  46 ++++
 .../cleanup/LocallyCleanableResource.java          |  47 +++++
 .../dispatcher/cleanup/ResourceCleaner.java        |  36 ++++
 .../dispatcher/cleanup/ResourceCleanerFactory.java |  54 +++++
 .../cleanup/DefaultResourceCleanerTest.java        | 235 +++++++++++++++++++++
 6 files changed, 563 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
new file mode 100644
index 0000000..ce4b2d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/** {@code DefaultResourceCleaner} is the default implementation of {@link ResourceCleaner}. */
+public class DefaultResourceCleaner<T> implements ResourceCleaner {
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+    private final Executor cleanupExecutor;
+    private final CleanupFn<T> cleanupFn;
+
+    private final Collection<T> prioritizedCleanup;
+    private final Collection<T> regularCleanup;
+
+    public static Builder<LocallyCleanableResource> forLocallyCleanableResources(
+            ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor) {
+        return forCleanableResources(
+                mainThreadExecutor, cleanupExecutor, LocallyCleanableResource::localCleanupAsync);
+    }
+
+    public static Builder<GloballyCleanableResource> forGloballyCleanableResources(
+            ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor) {
+        return forCleanableResources(
+                mainThreadExecutor, cleanupExecutor, GloballyCleanableResource::globalCleanupAsync);
+    }
+
+    @VisibleForTesting
+    static <T> Builder<T> forCleanableResources(
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Executor cleanupExecutor,
+            CleanupFn<T> cleanupFunction) {
+        return new Builder<>(mainThreadExecutor, cleanupExecutor, cleanupFunction);
+    }
+
+    @VisibleForTesting
+    @FunctionalInterface
+    interface CleanupFn<T> {
+        CompletableFuture<Void> cleanupAsync(T resource, JobID jobId, Executor cleanupExecutor);
+    }
+
+    /**
+     * {@code Builder} for creating {@code DefaultResourceCleaner} instances.
+     *
+     * @param <T> The functional interface that's being translated into the internally used {@link
+     *     CleanupFn}.
+     */
+    public static class Builder<T> {
+
+        private final ComponentMainThreadExecutor mainThreadExecutor;
+        private final Executor cleanupExecutor;
+        private final CleanupFn<T> cleanupFn;
+
+        private final Collection<T> prioritizedCleanup = new ArrayList<>();
+        private final Collection<T> regularCleanup = new ArrayList<>();
+
+        private Builder(
+                ComponentMainThreadExecutor mainThreadExecutor,
+                Executor cleanupExecutor,
+                CleanupFn<T> cleanupFn) {
+            this.mainThreadExecutor = mainThreadExecutor;
+            this.cleanupExecutor = cleanupExecutor;
+            this.cleanupFn = cleanupFn;
+        }
+
+        public Builder<T> withPrioritizedCleanup(T prioritizedCleanup) {
+            this.prioritizedCleanup.add(prioritizedCleanup);
+            return this;
+        }
+
+        public Builder<T> withRegularCleanup(T regularCleanup) {
+            this.regularCleanup.add(regularCleanup);
+            return this;
+        }
+
+        public DefaultResourceCleaner<T> build() {
+            return new DefaultResourceCleaner<>(
+                    mainThreadExecutor,
+                    cleanupExecutor,
+                    cleanupFn,
+                    prioritizedCleanup,
+                    regularCleanup);
+        }
+    }
+
+    private DefaultResourceCleaner(
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Executor cleanupExecutor,
+            CleanupFn<T> cleanupFn,
+            Collection<T> prioritizedCleanup,
+            Collection<T> regularCleanup) {
+        this.mainThreadExecutor = mainThreadExecutor;
+        this.cleanupExecutor = cleanupExecutor;
+        this.cleanupFn = cleanupFn;
+        this.prioritizedCleanup = prioritizedCleanup;
+        this.regularCleanup = regularCleanup;
+    }
+
+    @Override
+    public CompletableFuture<Void> cleanupAsync(JobID jobId) {
+        mainThreadExecutor.assertRunningInMainThread();
+        CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture();
+        for (T cleanup : prioritizedCleanup) {
+            cleanupFuture =
+                    cleanupFuture.thenCompose(
+                            ignoredValue ->
+                                    cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor));
+        }
+        return cleanupFuture.thenCompose(
+                ignoredValue ->
+                        FutureUtils.completeAll(
+                                regularCleanup.stream()
+                                        .map(
+                                                cleanup ->
+                                                        cleanupFn.cleanupAsync(
+                                                                cleanup, jobId, cleanupExecutor))
+                                        .collect(Collectors.toList())));
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java
new file mode 100644
index 0000000..66bfd32
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code GloballyCleanableResource} is supposed to be used by any class that provides artifacts for
+ * a given job that can be cleaned up globally. Globally available artifacts should survive a
+ * JobManager failover and are, in contrast to {@link GloballyCleanableResource}, only cleaned up
+ * after the corresponding job reached a globally-terminal state.
+ *
+ * @see org.apache.flink.api.common.JobStatus
+ */
+@FunctionalInterface
+public interface GloballyCleanableResource {
+
+    /**
+     * {@code globalCleanupAsync} is expected to be called from the main thread. Heavy IO tasks
+     * should be outsourced into the passed {@code cleanupExecutor}. Thread-safety must be ensured.
+     *
+     * @param jobId The {@link JobID} of the job for which the local data should be cleaned up.
+     * @param cleanupExecutor The fallback executor for IO-heavy operations.
+     * @return The cleanup result future.
+     */
+    CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor cleanupExecutor);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java
new file mode 100644
index 0000000..9e44a55
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for
+ * a given job that can be cleaned up locally. Artifacts considered to be local are located on the
+ * JobManager instance itself and won't survive a failover scenario. These artifacts are, in
+ * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the
+ * job reaches a locally-terminated state.
+ *
+ * @see org.apache.flink.api.common.JobStatus
+ */
+@FunctionalInterface
+public interface LocallyCleanableResource {
+
+    /**
+     * {@code localCleanupAsync} is expected to be called from the main thread. Heavy IO tasks
+     * should be outsourced into the passed {@code cleanupExecutor}. Thread-safety must be ensured.
+     *
+     * @param jobId The {@link JobID} of the job for which the local data should be cleaned up.
+     * @param cleanupExecutor The fallback executor for IO-heavy operations.
+     * @return The cleanup result future.
+     */
+    CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor cleanupExecutor);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java
new file mode 100644
index 0000000..bfea159
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+
+/** {@code ResourceCleaner} executes instances on the given {@code JobID}. */
+@FunctionalInterface
+public interface ResourceCleaner {
+
+    /**
+     * Cleans job-related data from resources asynchronously.
+     *
+     * @param jobId The {@link JobID} referring to the job for which the data shall be cleaned up.
+     * @return the cleanup result future.
+     */
+    CompletableFuture<Void> cleanupAsync(JobID jobId);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java
new file mode 100644
index 0000000..9f724b1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@code ResourceCleanerFactory} provides methods to create {@link ResourceCleaner} for local and
+ * global cleanup.
+ *
+ * @see GloballyCleanableResource
+ * @see LocallyCleanableResource
+ */
+public interface ResourceCleanerFactory {
+
+    /**
+     * Creates {@link ResourceCleaner} that initiates {@link
+     * LocallyCleanableResource#localCleanupAsync(JobID, Executor)} calls.
+     *
+     * @param mainThreadExecutor Used for validating that the {@link
+     *     LocallyCleanableResource#localCleanupAsync(JobID, Executor)} is called from the main
+     *     thread.
+     */
+    ResourceCleaner createLocalResourceCleaner(ComponentMainThreadExecutor mainThreadExecutor);
+
+    /**
+     * Creates {@link ResourceCleaner} that initiates {@link
+     * GloballyCleanableResource#globalCleanupAsync(JobID, Executor)} calls.
+     *
+     * @param mainThreadExecutor Used for validating that the {@link
+     *     GloballyCleanableResource#globalCleanupAsync(JobID, Executor)} is called from the main
+     *     thread.
+     */
+    ResourceCleaner createGlobalResourceCleaner(ComponentMainThreadExecutor mainThreadExecutor);
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
new file mode 100644
index 0000000..e8bbd4f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@code DefaultResourceCleanerTest} tests {@link DefaultResourceCleaner}. */
+public class DefaultResourceCleanerTest {
+
+    private static final Executor EXECUTOR = Executors.directExecutor();
+    private static final JobID JOB_ID = new JobID();
+
+    private DefaultResourceCleaner<CleanupCallback> testInstance;
+    private CleanupCallback cleanup0;
+    private CleanupCallback cleanup1;
+
+    @BeforeEach
+    public void setup() {
+        cleanup0 = CleanupCallback.withoutCompletionOnCleanup();
+        cleanup1 = CleanupCallback.withoutCompletionOnCleanup();
+
+        testInstance =
+                createTestInstanceBuilder()
+                        .withRegularCleanup(cleanup0)
+                        .withRegularCleanup(cleanup1)
+                        .build();
+    }
+
+    @Test
+    public void testSuccessfulConcurrentCleanup() {
+        CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+        assertThat(cleanupResult).isNotCompleted();
+        assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+        assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+
+        cleanup0.completeCleanup();
+        assertThat(cleanupResult).isNotCompleted();
+
+        cleanup1.completeCleanup();
+        assertThat(cleanupResult).isCompleted();
+    }
+
+    @Test
+    public void testConcurrentCleanupWithExceptionFirst() {
+        CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+        assertThat(cleanupResult).isNotCompleted();
+        assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+        assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+
+        final RuntimeException expectedException = new RuntimeException("Expected exception");
+        cleanup0.completeCleanupExceptionally(expectedException);
+        assertThat(cleanupResult).isNotCompleted();
+
+        cleanup1.completeCleanup();
+        assertThat(cleanupResult)
+                .failsWithin(Duration.ZERO)
+                .withThrowableOfType(ExecutionException.class)
+                .withCause(expectedException);
+    }
+
+    @Test
+    public void testConcurrentCleanupWithExceptionSecond() {
+        CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+        assertThat(cleanupResult).isNotCompleted();
+        assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+        assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+
+        cleanup0.completeCleanup();
+        assertThat(cleanupResult).isNotCompleted();
+
+        final RuntimeException expectedException = new RuntimeException("Expected exception");
+        cleanup1.completeCleanupExceptionally(expectedException);
+        assertThat(cleanupResult)
+                .failsWithin(Duration.ZERO)
+                .withThrowableOfType(ExecutionException.class)
+                .withCause(expectedException);
+    }
+
+    @Test
+    public void testHighestPriorityCleanupBlocksAllOtherCleanups() {
+        final CleanupCallback highPriorityCleanup = CleanupCallback.withoutCompletionOnCleanup();
+        final CleanupCallback lowerThanHighPriorityCleanup =
+                CleanupCallback.withCompletionOnCleanup();
+        final CleanupCallback noPriorityCleanup0 = CleanupCallback.withCompletionOnCleanup();
+        final CleanupCallback noPriorityCleanup1 = CleanupCallback.withCompletionOnCleanup();
+
+        final DefaultResourceCleaner<CleanupCallback> testInstance =
+                createTestInstanceBuilder()
+                        .withPrioritizedCleanup(highPriorityCleanup)
+                        .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
+                        .withRegularCleanup(noPriorityCleanup0)
+                        .withRegularCleanup(noPriorityCleanup1)
+                        .build();
+
+        final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+        assertThat(highPriorityCleanup.isDone()).isFalse();
+        assertThat(lowerThanHighPriorityCleanup.isDone()).isFalse();
+        assertThat(noPriorityCleanup0.isDone()).isFalse();
+        assertThat(noPriorityCleanup1.isDone()).isFalse();
+
+        assertThat(overallCleanupResult.isDone()).isFalse();
+
+        highPriorityCleanup.completeCleanup();
+
+        assertThat(overallCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+        assertThat(highPriorityCleanup.isDone()).isTrue();
+        assertThat(lowerThanHighPriorityCleanup.isDone()).isTrue();
+        assertThat(noPriorityCleanup0.isDone()).isTrue();
+        assertThat(noPriorityCleanup1.isDone()).isTrue();
+    }
+
+    @Test
+    public void testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() {
+        final CleanupCallback highPriorityCleanup = CleanupCallback.withCompletionOnCleanup();
+        final CleanupCallback lowerThanHighPriorityCleanup =
+                CleanupCallback.withoutCompletionOnCleanup();
+        final CleanupCallback noPriorityCleanup0 = CleanupCallback.withCompletionOnCleanup();
+        final CleanupCallback noPriorityCleanup1 = CleanupCallback.withCompletionOnCleanup();
+
+        final DefaultResourceCleaner<CleanupCallback> testInstance =
+                createTestInstanceBuilder()
+                        .withPrioritizedCleanup(highPriorityCleanup)
+                        .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
+                        .withRegularCleanup(noPriorityCleanup0)
+                        .withRegularCleanup(noPriorityCleanup1)
+                        .build();
+
+        assertThat(highPriorityCleanup.isDone()).isFalse();
+
+        final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+        assertThat(highPriorityCleanup.isDone()).isTrue();
+        assertThat(lowerThanHighPriorityCleanup.isDone()).isFalse();
+        assertThat(noPriorityCleanup0.isDone()).isFalse();
+        assertThat(noPriorityCleanup1.isDone()).isFalse();
+
+        assertThat(overallCleanupResult.isDone()).isFalse();
+
+        lowerThanHighPriorityCleanup.completeCleanup();
+
+        assertThat(overallCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+        assertThat(highPriorityCleanup.isDone()).isTrue();
+        assertThat(lowerThanHighPriorityCleanup.isDone()).isTrue();
+        assertThat(noPriorityCleanup0.isDone()).isTrue();
+        assertThat(noPriorityCleanup1.isDone()).isTrue();
+    }
+
+    private static DefaultResourceCleaner.Builder<CleanupCallback> createTestInstanceBuilder() {
+        return DefaultResourceCleaner.forCleanableResources(
+                ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                EXECUTOR,
+                CleanupCallback::cleanup);
+    }
+
+    private static class CleanupCallback {
+
+        private final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        private JobID jobId;
+
+        private final Consumer<CompletableFuture<Void>> internalFunction;
+
+        public static CleanupCallback withCompletionOnCleanup() {
+            return new CleanupCallback(resultFuture -> resultFuture.complete(null));
+        }
+
+        public static CleanupCallback withoutCompletionOnCleanup() {
+            return new CleanupCallback(ignoredResultFuture -> {});
+        }
+
+        private CleanupCallback(Consumer<CompletableFuture<Void>> internalFunction) {
+            this.internalFunction = internalFunction;
+        }
+
+        public CompletableFuture<Void> cleanup(JobID jobId, Executor executor) {
+            Preconditions.checkState(this.jobId == null);
+            this.jobId = jobId;
+
+            internalFunction.accept(resultFuture);
+
+            return resultFuture;
+        }
+
+        public boolean isDone() {
+            return resultFuture.isDone();
+        }
+
+        public JobID getProcessedJobId() {
+            return jobId;
+        }
+
+        public void completeCleanup() {
+            this.resultFuture.complete(null);
+        }
+
+        public void completeCleanupExceptionally(Throwable expectedException) {
+            this.resultFuture.completeExceptionally(expectedException);
+        }
+    }
+}