You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/10 14:16:18 UTC
[flink] 03/03: [FLINK-26494][runtime] Adds log message to cleanup failure
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c5352fc55972420ed5bf1afdfd97834540b1407a
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Mar 4 21:51:30 2022 +0100
[FLINK-26494][runtime] Adds log message to cleanup failure
---
.../dispatcher/cleanup/DefaultResourceCleaner.java | 102 +++++++++++++++++----
.../cleanup/DispatcherResourceCleanerFactory.java | 27 ++++--
.../cleanup/DefaultResourceCleanerTest.java | 41 +++++----
3 files changed, 125 insertions(+), 45 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
index 6226f74..77c40d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.RetryStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@@ -37,12 +40,14 @@ import java.util.stream.Collectors;
*/
public class DefaultResourceCleaner<T> implements ResourceCleaner {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceCleaner.class);
+
private final ComponentMainThreadExecutor mainThreadExecutor;
private final Executor cleanupExecutor;
private final CleanupFn<T> cleanupFn;
- private final Collection<T> prioritizedCleanup;
- private final Collection<T> regularCleanup;
+ private final Collection<CleanupWithLabel<T>> prioritizedCleanup;
+ private final Collection<CleanupWithLabel<T>> regularCleanup;
private final RetryStrategy retryStrategy;
@@ -97,8 +102,8 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
private final RetryStrategy retryStrategy;
- private final Collection<T> prioritizedCleanup = new ArrayList<>();
- private final Collection<T> regularCleanup = new ArrayList<>();
+ private final Collection<CleanupWithLabel<T>> prioritizedCleanup = new ArrayList<>();
+ private final Collection<CleanupWithLabel<T>> regularCleanup = new ArrayList<>();
private Builder(
ComponentMainThreadExecutor mainThreadExecutor,
@@ -117,10 +122,13 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
* resources are added matters, i.e. if two cleanable resources are added as prioritized
* cleanup tasks, the resource being added first will block the cleanup of the second
* resource. All prioritized cleanup resources will run and finish before any resource that
- * is added using {@link #withRegularCleanup(Object)} is started.
+ * is added using {@link #withRegularCleanup(String, Object)} is started.
+ *
+ * @param label The label being used when logging errors in the given cleanup.
+ * @param prioritizedCleanup The cleanup callback that is going to be prioritized.
*/
- public Builder<T> withPrioritizedCleanup(T prioritizedCleanup) {
- this.prioritizedCleanup.add(prioritizedCleanup);
+ public Builder<T> withPrioritizedCleanup(String label, T prioritizedCleanup) {
+ this.prioritizedCleanup.add(new CleanupWithLabel<>(prioritizedCleanup, label));
return this;
}
@@ -128,10 +136,13 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
* Regular cleanups are resources for which the cleanup is triggered after all prioritized
* cleanups succeeded. All added regular cleanups will run concurrently to each other.
*
- * @see #withPrioritizedCleanup(Object)
+ * @param label The label being used when logging errors in the given cleanup.
+ * @param regularCleanup The cleanup callback that is going to run after all prioritized
+ * cleanups are finished.
+ * @see #withPrioritizedCleanup(String, Object)
*/
- public Builder<T> withRegularCleanup(T regularCleanup) {
- this.regularCleanup.add(regularCleanup);
+ public Builder<T> withRegularCleanup(String label, T regularCleanup) {
+ this.regularCleanup.add(new CleanupWithLabel<>(regularCleanup, label));
return this;
}
@@ -150,8 +161,8 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
ComponentMainThreadExecutor mainThreadExecutor,
Executor cleanupExecutor,
CleanupFn<T> cleanupFn,
- Collection<T> prioritizedCleanup,
- Collection<T> regularCleanup,
+ Collection<CleanupWithLabel<T>> prioritizedCleanup,
+ Collection<CleanupWithLabel<T>> regularCleanup,
RetryStrategy retryStrategy) {
this.mainThreadExecutor = mainThreadExecutor;
this.cleanupExecutor = cleanupExecutor;
@@ -166,22 +177,79 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
mainThreadExecutor.assertRunningInMainThread();
CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture();
- for (T cleanup : prioritizedCleanup) {
- cleanupFuture = cleanupFuture.thenCompose(ignoredValue -> withRetry(jobId, cleanup));
+ for (CleanupWithLabel<T> cleanupWithLabel : prioritizedCleanup) {
+ cleanupFuture =
+ cleanupFuture.thenCompose(
+ ignoredValue ->
+ withRetry(
+ jobId,
+ cleanupWithLabel.getLabel(),
+ cleanupWithLabel.getCleanup()));
}
return cleanupFuture.thenCompose(
ignoredValue ->
FutureUtils.completeAll(
regularCleanup.stream()
- .map(cleanup -> withRetry(jobId, cleanup))
+ .map(
+ cleanupWithLabel ->
+ withRetry(
+ jobId,
+ cleanupWithLabel.getLabel(),
+ cleanupWithLabel.getCleanup()))
.collect(Collectors.toList())));
}
- private CompletableFuture<Void> withRetry(JobID jobId, T cleanup) {
+ private CompletableFuture<Void> withRetry(JobID jobId, String label, T cleanup) {
return FutureUtils.retryWithDelay(
- () -> cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor),
+ () ->
+ cleanupFn
+ .cleanupAsync(cleanup, jobId, cleanupExecutor)
+ .whenComplete(
+ (value, throwable) -> {
+ if (throwable != null) {
+ final String logMessage =
+ String.format(
+ "Cleanup of %s failed for job %s due to a %s: %s",
+ label,
+ jobId,
+ throwable
+ .getClass()
+ .getSimpleName(),
+ throwable.getMessage());
+ if (LOG.isTraceEnabled()) {
+ LOG.warn(logMessage, throwable);
+ } else {
+ LOG.warn(logMessage);
+ }
+ }
+ }),
retryStrategy,
mainThreadExecutor);
}
+
+ /**
+ * {@code CleanupWithLabel} makes it possible to attach a label to a given cleanup that can be
+ * used as human-readable representation of the corresponding cleanup.
+ *
+ * @param <CLEANUP_TYPE> The type of cleanup.
+ */
+ private static class CleanupWithLabel<CLEANUP_TYPE> {
+
+ private final CLEANUP_TYPE cleanup;
+ private final String label;
+
+ public CleanupWithLabel(CLEANUP_TYPE cleanup, String label) {
+ this.cleanup = cleanup;
+ this.label = label;
+ }
+
+ public CLEANUP_TYPE getCleanup() {
+ return cleanup;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
index 3fd6dc4..f14d7aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
@@ -43,6 +43,12 @@ import java.util.concurrent.Executor;
*/
public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory {
+ private static final String JOB_MANAGER_RUNNER_REGISTRY_LABEL = "JobManagerRunnerRegistry";
+ private static final String JOB_GRAPH_STORE_LABEL = "JobGraphStore";
+ private static final String BLOB_SERVER_LABEL = "BlobServer";
+ private static final String HA_SERVICES_LABEL = "HighAvailabilityServices";
+ private static final String JOB_MANAGER_METRIC_GROUP_LABEL = "JobManagerMetricGroup";
+
private final Executor cleanupExecutor;
private final RetryStrategy retryStrategy;
@@ -89,10 +95,10 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory
ComponentMainThreadExecutor mainThreadExecutor) {
return DefaultResourceCleaner.forLocallyCleanableResources(
mainThreadExecutor, cleanupExecutor, retryStrategy)
- .withPrioritizedCleanup(jobManagerRunnerRegistry)
- .withRegularCleanup(jobGraphWriter)
- .withRegularCleanup(blobServer)
- .withRegularCleanup(jobManagerMetricGroup)
+ .withPrioritizedCleanup(JOB_MANAGER_RUNNER_REGISTRY_LABEL, jobManagerRunnerRegistry)
+ .withRegularCleanup(JOB_GRAPH_STORE_LABEL, jobGraphWriter)
+ .withRegularCleanup(BLOB_SERVER_LABEL, blobServer)
+ .withRegularCleanup(JOB_MANAGER_METRIC_GROUP_LABEL, jobManagerMetricGroup)
.build();
}
@@ -101,11 +107,14 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory
ComponentMainThreadExecutor mainThreadExecutor) {
return DefaultResourceCleaner.forGloballyCleanableResources(
mainThreadExecutor, cleanupExecutor, retryStrategy)
- .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry))
- .withRegularCleanup(jobGraphWriter)
- .withRegularCleanup(blobServer)
- .withRegularCleanup(highAvailabilityServices)
- .withRegularCleanup(ofLocalResource(jobManagerMetricGroup))
+ .withPrioritizedCleanup(
+ JOB_MANAGER_RUNNER_REGISTRY_LABEL,
+ ofLocalResource(jobManagerRunnerRegistry))
+ .withRegularCleanup(JOB_GRAPH_STORE_LABEL, jobGraphWriter)
+ .withRegularCleanup(BLOB_SERVER_LABEL, blobServer)
+ .withRegularCleanup(HA_SERVICES_LABEL, highAvailabilityServices)
+ .withRegularCleanup(
+ JOB_MANAGER_METRIC_GROUP_LABEL, ofLocalResource(jobManagerMetricGroup))
.build();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
index df16df7..39d332d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
@@ -34,6 +34,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,8 +62,8 @@ public class DefaultResourceCleanerTest {
final CompletableFuture<Void> cleanupResult =
createTestInstanceBuilder()
- .withRegularCleanup(cleanup0)
- .withRegularCleanup(cleanup1)
+ .withRegularCleanup("Reg #0", cleanup0)
+ .withRegularCleanup("Reg #1", cleanup1)
.build()
.cleanupAsync(JOB_ID);
@@ -84,8 +85,8 @@ public class DefaultResourceCleanerTest {
final CompletableFuture<Void> cleanupResult =
createTestInstanceBuilder()
- .withRegularCleanup(cleanup0)
- .withRegularCleanup(cleanup1)
+ .withRegularCleanup("Reg #0", cleanup0)
+ .withRegularCleanup("Reg #1", cleanup1)
.build()
.cleanupAsync(JOB_ID);
@@ -105,6 +106,7 @@ public class DefaultResourceCleanerTest {
.hasExactlyElementsOfTypes(
ExecutionException.class,
FutureUtils.RetryException.class,
+ CompletionException.class,
expectedException.getClass())
.last()
.isEqualTo(expectedException);
@@ -117,8 +119,8 @@ public class DefaultResourceCleanerTest {
final CompletableFuture<Void> cleanupResult =
createTestInstanceBuilder()
- .withRegularCleanup(cleanup0)
- .withRegularCleanup(cleanup1)
+ .withRegularCleanup("Reg #0", cleanup0)
+ .withRegularCleanup("Reg #1", cleanup1)
.build()
.cleanupAsync(JOB_ID);
@@ -138,6 +140,7 @@ public class DefaultResourceCleanerTest {
.hasExactlyElementsOfTypes(
ExecutionException.class,
FutureUtils.RetryException.class,
+ CompletionException.class,
expectedException.getClass())
.last()
.isEqualTo(expectedException);
@@ -154,10 +157,10 @@ public class DefaultResourceCleanerTest {
final DefaultResourceCleaner<CleanupCallback> testInstance =
createTestInstanceBuilder()
- .withPrioritizedCleanup(highPriorityCleanup)
- .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
- .withRegularCleanup(noPriorityCleanup0)
- .withRegularCleanup(noPriorityCleanup1)
+ .withPrioritizedCleanup("Prio #0", highPriorityCleanup)
+ .withPrioritizedCleanup("Prio #1", lowerThanHighPriorityCleanup)
+ .withRegularCleanup("Reg #0", noPriorityCleanup0)
+ .withRegularCleanup("Reg #1", noPriorityCleanup1)
.build();
final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
@@ -189,10 +192,10 @@ public class DefaultResourceCleanerTest {
final DefaultResourceCleaner<CleanupCallback> testInstance =
createTestInstanceBuilder()
- .withPrioritizedCleanup(highPriorityCleanup)
- .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
- .withRegularCleanup(noPriorityCleanup0)
- .withRegularCleanup(noPriorityCleanup1)
+ .withPrioritizedCleanup("Prio #0", highPriorityCleanup)
+ .withPrioritizedCleanup("Prio #1", lowerThanHighPriorityCleanup)
+ .withRegularCleanup("Reg #0", noPriorityCleanup0)
+ .withRegularCleanup("Reg #1", noPriorityCleanup1)
.build();
assertThat(highPriorityCleanup.isDone()).isFalse();
@@ -224,8 +227,8 @@ public class DefaultResourceCleanerTest {
final CompletableFuture<Void> compositeCleanupResult =
createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(2))
- .withRegularCleanup(cleanupWithRetries)
- .withRegularCleanup(oneRunCleanup)
+ .withRegularCleanup("Reg #0", cleanupWithRetries)
+ .withRegularCleanup("Reg #1", oneRunCleanup)
.build()
.cleanupAsync(JOB_ID);
@@ -246,9 +249,9 @@ public class DefaultResourceCleanerTest {
final CompletableFuture<Void> compositeCleanupResult =
createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(1))
- .withPrioritizedCleanup(cleanupWithRetry)
- .withPrioritizedCleanup(oneRunHigherPriorityCleanup)
- .withRegularCleanup(oneRunCleanup)
+ .withPrioritizedCleanup("Prio #0", cleanupWithRetry)
+ .withPrioritizedCleanup("Prio #1", oneRunHigherPriorityCleanup)
+ .withRegularCleanup("Reg #0", oneRunCleanup)
.build()
.cleanupAsync(JOB_ID);