You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:22 UTC
[flink] 04/10: [FLINK-25432][runtime] Adds generic interfaces for cleaning up Job-related data
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bf30a9b175648fea4dda7aab748a6b6d73dfba27
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Thu Feb 3 17:53:18 2022 +0100
[FLINK-25432][runtime] Adds generic interfaces for cleaning up Job-related data
---
.../dispatcher/cleanup/DefaultResourceCleaner.java | 145 +++++++++++++
.../cleanup/GloballyCleanableResource.java | 46 ++++
.../cleanup/LocallyCleanableResource.java | 47 +++++
.../dispatcher/cleanup/ResourceCleaner.java | 36 ++++
.../dispatcher/cleanup/ResourceCleanerFactory.java | 54 +++++
.../cleanup/DefaultResourceCleanerTest.java | 235 +++++++++++++++++++++
6 files changed, 563 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
new file mode 100644
index 0000000..ce4b2d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/** {@code DefaultResourceCleaner} is the default implementation of {@link ResourceCleaner}. */
+public class DefaultResourceCleaner<T> implements ResourceCleaner {
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+ private final Executor cleanupExecutor;
+ private final CleanupFn<T> cleanupFn;
+
+ private final Collection<T> prioritizedCleanup;
+ private final Collection<T> regularCleanup;
+
+ public static Builder<LocallyCleanableResource> forLocallyCleanableResources(
+ ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor) {
+ return forCleanableResources(
+ mainThreadExecutor, cleanupExecutor, LocallyCleanableResource::localCleanupAsync);
+ }
+
+ public static Builder<GloballyCleanableResource> forGloballyCleanableResources(
+ ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor) {
+ return forCleanableResources(
+ mainThreadExecutor, cleanupExecutor, GloballyCleanableResource::globalCleanupAsync);
+ }
+
+ @VisibleForTesting
+ static <T> Builder<T> forCleanableResources(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn<T> cleanupFunction) {
+ return new Builder<>(mainThreadExecutor, cleanupExecutor, cleanupFunction);
+ }
+
+ @VisibleForTesting
+ @FunctionalInterface
+ interface CleanupFn<T> {
+ CompletableFuture<Void> cleanupAsync(T resource, JobID jobId, Executor cleanupExecutor);
+ }
+
+ /**
+ * {@code Builder} for creating {@code DefaultResourceCleaner} instances.
+ *
+ * @param <T> The functional interface that's being translated into the internally used {@link
+ * CleanupFn}.
+ */
+ public static class Builder<T> {
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+ private final Executor cleanupExecutor;
+ private final CleanupFn<T> cleanupFn;
+
+ private final Collection<T> prioritizedCleanup = new ArrayList<>();
+ private final Collection<T> regularCleanup = new ArrayList<>();
+
+ private Builder(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn<T> cleanupFn) {
+ this.mainThreadExecutor = mainThreadExecutor;
+ this.cleanupExecutor = cleanupExecutor;
+ this.cleanupFn = cleanupFn;
+ }
+
+ public Builder<T> withPrioritizedCleanup(T prioritizedCleanup) {
+ this.prioritizedCleanup.add(prioritizedCleanup);
+ return this;
+ }
+
+ public Builder<T> withRegularCleanup(T regularCleanup) {
+ this.regularCleanup.add(regularCleanup);
+ return this;
+ }
+
+ public DefaultResourceCleaner<T> build() {
+ return new DefaultResourceCleaner<>(
+ mainThreadExecutor,
+ cleanupExecutor,
+ cleanupFn,
+ prioritizedCleanup,
+ regularCleanup);
+ }
+ }
+
+ private DefaultResourceCleaner(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn<T> cleanupFn,
+ Collection<T> prioritizedCleanup,
+ Collection<T> regularCleanup) {
+ this.mainThreadExecutor = mainThreadExecutor;
+ this.cleanupExecutor = cleanupExecutor;
+ this.cleanupFn = cleanupFn;
+ this.prioritizedCleanup = prioritizedCleanup;
+ this.regularCleanup = regularCleanup;
+ }
+
+ @Override
+ public CompletableFuture<Void> cleanupAsync(JobID jobId) {
+ mainThreadExecutor.assertRunningInMainThread();
+ CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture();
+ for (T cleanup : prioritizedCleanup) {
+ cleanupFuture =
+ cleanupFuture.thenCompose(
+ ignoredValue ->
+ cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor));
+ }
+ return cleanupFuture.thenCompose(
+ ignoredValue ->
+ FutureUtils.completeAll(
+ regularCleanup.stream()
+ .map(
+ cleanup ->
+ cleanupFn.cleanupAsync(
+ cleanup, jobId, cleanupExecutor))
+ .collect(Collectors.toList())));
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java
new file mode 100644
index 0000000..66bfd32
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code GloballyCleanableResource} is supposed to be used by any class that provides artifacts for
+ * a given job that can be cleaned up globally. Globally available artifacts should survive a
+ * JobManager failover and are, in contrast to {@link GloballyCleanableResource}, only cleaned up
+ * after the corresponding job reached a globally-terminal state.
+ *
+ * @see org.apache.flink.api.common.JobStatus
+ */
+@FunctionalInterface
+public interface GloballyCleanableResource {
+
+ /**
+ * {@code globalCleanupAsync} is expected to be called from the main thread. Heavy IO tasks
+ * should be outsourced into the passed {@code cleanupExecutor}. Thread-safety must be ensured.
+ *
+ * @param jobId The {@link JobID} of the job for which the local data should be cleaned up.
+ * @param cleanupExecutor The fallback executor for IO-heavy operations.
+ * @return The cleanup result future.
+ */
+ CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor cleanupExecutor);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java
new file mode 100644
index 0000000..9e44a55
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for
+ * a given job that can be cleaned up locally. Artifacts considered to be local are located on the
+ * JobManager instance itself and won't survive a failover scenario. These artifacts are, in
+ * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the
+ * job reaches a locally-terminated state.
+ *
+ * @see org.apache.flink.api.common.JobStatus
+ */
+@FunctionalInterface
+public interface LocallyCleanableResource {
+
+ /**
+ * {@code localCleanupAsync} is expected to be called from the main thread. Heavy IO tasks
+ * should be outsourced into the passed {@code cleanupExecutor}. Thread-safety must be ensured.
+ *
+ * @param jobId The {@link JobID} of the job for which the local data should be cleaned up.
+ * @param cleanupExecutor The fallback executor for IO-heavy operations.
+ * @return The cleanup result future.
+ */
+ CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor cleanupExecutor);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java
new file mode 100644
index 0000000..bfea159
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+
+/** {@code ResourceCleaner} executes instances on the given {@code JobID}. */
+@FunctionalInterface
+public interface ResourceCleaner {
+
+ /**
+ * Cleans job-related data from resources asynchronously.
+ *
+ * @param jobId The {@link JobID} referring to the job for which the data shall be cleaned up.
+ * @return the cleanup result future.
+ */
+ CompletableFuture<Void> cleanupAsync(JobID jobId);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java
new file mode 100644
index 0000000..9f724b1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@code ResourceCleanerFactory} provides methods to create {@link ResourceCleaner} for local and
+ * global cleanup.
+ *
+ * @see GloballyCleanableResource
+ * @see LocallyCleanableResource
+ */
+public interface ResourceCleanerFactory {
+
+ /**
+ * Creates {@link ResourceCleaner} that initiates {@link
+ * LocallyCleanableResource#localCleanupAsync(JobID, Executor)} calls.
+ *
+ * @param mainThreadExecutor Used for validating that the {@link
+ * LocallyCleanableResource#localCleanupAsync(JobID, Executor)} is called from the main
+ * thread.
+ */
+ ResourceCleaner createLocalResourceCleaner(ComponentMainThreadExecutor mainThreadExecutor);
+
+ /**
+ * Creates {@link ResourceCleaner} that initiates {@link
+ * GloballyCleanableResource#globalCleanupAsync(JobID, Executor)} calls.
+ *
+ * @param mainThreadExecutor Used for validating that the {@link
+ * GloballyCleanableResource#globalCleanupAsync(JobID, Executor)} is called from the main
+ * thread.
+ */
+ ResourceCleaner createGlobalResourceCleaner(ComponentMainThreadExecutor mainThreadExecutor);
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
new file mode 100644
index 0000000..e8bbd4f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@code DefaultResourceCleanerTest} tests {@link DefaultResourceCleaner}. */
+public class DefaultResourceCleanerTest {
+
+ private static final Executor EXECUTOR = Executors.directExecutor();
+ private static final JobID JOB_ID = new JobID();
+
+ private DefaultResourceCleaner<CleanupCallback> testInstance;
+ private CleanupCallback cleanup0;
+ private CleanupCallback cleanup1;
+
+ @BeforeEach
+ public void setup() {
+ cleanup0 = CleanupCallback.withoutCompletionOnCleanup();
+ cleanup1 = CleanupCallback.withoutCompletionOnCleanup();
+
+ testInstance =
+ createTestInstanceBuilder()
+ .withRegularCleanup(cleanup0)
+ .withRegularCleanup(cleanup1)
+ .build();
+ }
+
+ @Test
+ public void testSuccessfulConcurrentCleanup() {
+ CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+ assertThat(cleanupResult).isNotCompleted();
+ assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+ assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+
+ cleanup0.completeCleanup();
+ assertThat(cleanupResult).isNotCompleted();
+
+ cleanup1.completeCleanup();
+ assertThat(cleanupResult).isCompleted();
+ }
+
+ @Test
+ public void testConcurrentCleanupWithExceptionFirst() {
+ CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+ assertThat(cleanupResult).isNotCompleted();
+ assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+ assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+
+ final RuntimeException expectedException = new RuntimeException("Expected exception");
+ cleanup0.completeCleanupExceptionally(expectedException);
+ assertThat(cleanupResult).isNotCompleted();
+
+ cleanup1.completeCleanup();
+ assertThat(cleanupResult)
+ .failsWithin(Duration.ZERO)
+ .withThrowableOfType(ExecutionException.class)
+ .withCause(expectedException);
+ }
+
+ @Test
+ public void testConcurrentCleanupWithExceptionSecond() {
+ CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+ assertThat(cleanupResult).isNotCompleted();
+ assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+ assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID);
+
+ cleanup0.completeCleanup();
+ assertThat(cleanupResult).isNotCompleted();
+
+ final RuntimeException expectedException = new RuntimeException("Expected exception");
+ cleanup1.completeCleanupExceptionally(expectedException);
+ assertThat(cleanupResult)
+ .failsWithin(Duration.ZERO)
+ .withThrowableOfType(ExecutionException.class)
+ .withCause(expectedException);
+ }
+
+ @Test
+ public void testHighestPriorityCleanupBlocksAllOtherCleanups() {
+ final CleanupCallback highPriorityCleanup = CleanupCallback.withoutCompletionOnCleanup();
+ final CleanupCallback lowerThanHighPriorityCleanup =
+ CleanupCallback.withCompletionOnCleanup();
+ final CleanupCallback noPriorityCleanup0 = CleanupCallback.withCompletionOnCleanup();
+ final CleanupCallback noPriorityCleanup1 = CleanupCallback.withCompletionOnCleanup();
+
+ final DefaultResourceCleaner<CleanupCallback> testInstance =
+ createTestInstanceBuilder()
+ .withPrioritizedCleanup(highPriorityCleanup)
+ .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
+ .withRegularCleanup(noPriorityCleanup0)
+ .withRegularCleanup(noPriorityCleanup1)
+ .build();
+
+ final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+ assertThat(highPriorityCleanup.isDone()).isFalse();
+ assertThat(lowerThanHighPriorityCleanup.isDone()).isFalse();
+ assertThat(noPriorityCleanup0.isDone()).isFalse();
+ assertThat(noPriorityCleanup1.isDone()).isFalse();
+
+ assertThat(overallCleanupResult.isDone()).isFalse();
+
+ highPriorityCleanup.completeCleanup();
+
+ assertThat(overallCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+ assertThat(highPriorityCleanup.isDone()).isTrue();
+ assertThat(lowerThanHighPriorityCleanup.isDone()).isTrue();
+ assertThat(noPriorityCleanup0.isDone()).isTrue();
+ assertThat(noPriorityCleanup1.isDone()).isTrue();
+ }
+
+ @Test
+ public void testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() {
+ final CleanupCallback highPriorityCleanup = CleanupCallback.withCompletionOnCleanup();
+ final CleanupCallback lowerThanHighPriorityCleanup =
+ CleanupCallback.withoutCompletionOnCleanup();
+ final CleanupCallback noPriorityCleanup0 = CleanupCallback.withCompletionOnCleanup();
+ final CleanupCallback noPriorityCleanup1 = CleanupCallback.withCompletionOnCleanup();
+
+ final DefaultResourceCleaner<CleanupCallback> testInstance =
+ createTestInstanceBuilder()
+ .withPrioritizedCleanup(highPriorityCleanup)
+ .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
+ .withRegularCleanup(noPriorityCleanup0)
+ .withRegularCleanup(noPriorityCleanup1)
+ .build();
+
+ assertThat(highPriorityCleanup.isDone()).isFalse();
+
+ final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
+
+ assertThat(highPriorityCleanup.isDone()).isTrue();
+ assertThat(lowerThanHighPriorityCleanup.isDone()).isFalse();
+ assertThat(noPriorityCleanup0.isDone()).isFalse();
+ assertThat(noPriorityCleanup1.isDone()).isFalse();
+
+ assertThat(overallCleanupResult.isDone()).isFalse();
+
+ lowerThanHighPriorityCleanup.completeCleanup();
+
+ assertThat(overallCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+ assertThat(highPriorityCleanup.isDone()).isTrue();
+ assertThat(lowerThanHighPriorityCleanup.isDone()).isTrue();
+ assertThat(noPriorityCleanup0.isDone()).isTrue();
+ assertThat(noPriorityCleanup1.isDone()).isTrue();
+ }
+
+ private static DefaultResourceCleaner.Builder<CleanupCallback> createTestInstanceBuilder() {
+ return DefaultResourceCleaner.forCleanableResources(
+ ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ EXECUTOR,
+ CleanupCallback::cleanup);
+ }
+
+ private static class CleanupCallback {
+
+ private final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ private JobID jobId;
+
+ private final Consumer<CompletableFuture<Void>> internalFunction;
+
+ public static CleanupCallback withCompletionOnCleanup() {
+ return new CleanupCallback(resultFuture -> resultFuture.complete(null));
+ }
+
+ public static CleanupCallback withoutCompletionOnCleanup() {
+ return new CleanupCallback(ignoredResultFuture -> {});
+ }
+
+ private CleanupCallback(Consumer<CompletableFuture<Void>> internalFunction) {
+ this.internalFunction = internalFunction;
+ }
+
+ public CompletableFuture<Void> cleanup(JobID jobId, Executor executor) {
+ Preconditions.checkState(this.jobId == null);
+ this.jobId = jobId;
+
+ internalFunction.accept(resultFuture);
+
+ return resultFuture;
+ }
+
+ public boolean isDone() {
+ return resultFuture.isDone();
+ }
+
+ public JobID getProcessedJobId() {
+ return jobId;
+ }
+
+ public void completeCleanup() {
+ this.resultFuture.complete(null);
+ }
+
+ public void completeCleanupExceptionally(Throwable expectedException) {
+ this.resultFuture.completeExceptionally(expectedException);
+ }
+ }
+}