You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/10 14:16:18 UTC

[flink] 03/03: [FLINK-26494][runtime] Adds log message to cleanup failure

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5352fc55972420ed5bf1afdfd97834540b1407a
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Mar 4 21:51:30 2022 +0100

    [FLINK-26494][runtime] Adds log message to cleanup failure
---
 .../dispatcher/cleanup/DefaultResourceCleaner.java | 102 +++++++++++++++++----
 .../cleanup/DispatcherResourceCleanerFactory.java  |  27 ++++--
 .../cleanup/DefaultResourceCleanerTest.java        |  41 +++++----
 3 files changed, 125 insertions(+), 45 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
index 6226f74..77c40d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.RetryStrategy;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -37,12 +40,14 @@ import java.util.stream.Collectors;
  */
 public class DefaultResourceCleaner<T> implements ResourceCleaner {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceCleaner.class);
+
     private final ComponentMainThreadExecutor mainThreadExecutor;
     private final Executor cleanupExecutor;
     private final CleanupFn<T> cleanupFn;
 
-    private final Collection<T> prioritizedCleanup;
-    private final Collection<T> regularCleanup;
+    private final Collection<CleanupWithLabel<T>> prioritizedCleanup;
+    private final Collection<CleanupWithLabel<T>> regularCleanup;
 
     private final RetryStrategy retryStrategy;
 
@@ -97,8 +102,8 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
 
         private final RetryStrategy retryStrategy;
 
-        private final Collection<T> prioritizedCleanup = new ArrayList<>();
-        private final Collection<T> regularCleanup = new ArrayList<>();
+        private final Collection<CleanupWithLabel<T>> prioritizedCleanup = new ArrayList<>();
+        private final Collection<CleanupWithLabel<T>> regularCleanup = new ArrayList<>();
 
         private Builder(
                 ComponentMainThreadExecutor mainThreadExecutor,
@@ -117,10 +122,13 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
          * resources are added matters, i.e. if two cleanable resources are added as prioritized
          * cleanup tasks, the resource being added first will block the cleanup of the second
          * resource. All prioritized cleanup resources will run and finish before any resource that
-         * is added using {@link #withRegularCleanup(Object)} is started.
+         * is added using {@link #withRegularCleanup(String, Object)} is started.
+         *
+         * @param label The label being used when logging errors in the given cleanup.
+         * @param prioritizedCleanup The cleanup callback that is going to be prioritized.
          */
-        public Builder<T> withPrioritizedCleanup(T prioritizedCleanup) {
-            this.prioritizedCleanup.add(prioritizedCleanup);
+        public Builder<T> withPrioritizedCleanup(String label, T prioritizedCleanup) {
+            this.prioritizedCleanup.add(new CleanupWithLabel<>(prioritizedCleanup, label));
             return this;
         }
 
@@ -128,10 +136,13 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
          * Regular cleanups are resources for which the cleanup is triggered after all prioritized
          * cleanups succeeded. All added regular cleanups will run concurrently to each other.
          *
-         * @see #withPrioritizedCleanup(Object)
+         * @param label The label being used when logging errors in the given cleanup.
+         * @param regularCleanup The cleanup callback that is going to run after all prioritized
+         *     cleanups are finished.
+         * @see #withPrioritizedCleanup(String, Object)
          */
-        public Builder<T> withRegularCleanup(T regularCleanup) {
-            this.regularCleanup.add(regularCleanup);
+        public Builder<T> withRegularCleanup(String label, T regularCleanup) {
+            this.regularCleanup.add(new CleanupWithLabel<>(regularCleanup, label));
             return this;
         }
 
@@ -150,8 +161,8 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
             ComponentMainThreadExecutor mainThreadExecutor,
             Executor cleanupExecutor,
             CleanupFn<T> cleanupFn,
-            Collection<T> prioritizedCleanup,
-            Collection<T> regularCleanup,
+            Collection<CleanupWithLabel<T>> prioritizedCleanup,
+            Collection<CleanupWithLabel<T>> regularCleanup,
             RetryStrategy retryStrategy) {
         this.mainThreadExecutor = mainThreadExecutor;
         this.cleanupExecutor = cleanupExecutor;
@@ -166,22 +177,79 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
         mainThreadExecutor.assertRunningInMainThread();
 
         CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture();
-        for (T cleanup : prioritizedCleanup) {
-            cleanupFuture = cleanupFuture.thenCompose(ignoredValue -> withRetry(jobId, cleanup));
+        for (CleanupWithLabel<T> cleanupWithLabel : prioritizedCleanup) {
+            cleanupFuture =
+                    cleanupFuture.thenCompose(
+                            ignoredValue ->
+                                    withRetry(
+                                            jobId,
+                                            cleanupWithLabel.getLabel(),
+                                            cleanupWithLabel.getCleanup()));
         }
 
         return cleanupFuture.thenCompose(
                 ignoredValue ->
                         FutureUtils.completeAll(
                                 regularCleanup.stream()
-                                        .map(cleanup -> withRetry(jobId, cleanup))
+                                        .map(
+                                                cleanupWithLabel ->
+                                                        withRetry(
+                                                                jobId,
+                                                                cleanupWithLabel.getLabel(),
+                                                                cleanupWithLabel.getCleanup()))
                                         .collect(Collectors.toList())));
     }
 
-    private CompletableFuture<Void> withRetry(JobID jobId, T cleanup) {
+    private CompletableFuture<Void> withRetry(JobID jobId, String label, T cleanup) {
         return FutureUtils.retryWithDelay(
-                () -> cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor),
+                () ->
+                        cleanupFn
+                                .cleanupAsync(cleanup, jobId, cleanupExecutor)
+                                .whenComplete(
+                                        (value, throwable) -> {
+                                            if (throwable != null) {
+                                                final String logMessage =
+                                                        String.format(
+                                                                "Cleanup of %s failed for job %s due to a %s: %s",
+                                                                label,
+                                                                jobId,
+                                                                throwable
+                                                                        .getClass()
+                                                                        .getSimpleName(),
+                                                                throwable.getMessage());
+                                                if (LOG.isTraceEnabled()) {
+                                                    LOG.warn(logMessage, throwable);
+                                                } else {
+                                                    LOG.warn(logMessage);
+                                                }
+                                            }
+                                        }),
                 retryStrategy,
                 mainThreadExecutor);
     }
+
+    /**
+     * {@code CleanupWithLabel} makes it possible to attach a label to a given cleanup that can be
+     * used as human-readable representation of the corresponding cleanup.
+     *
+     * @param <CLEANUP_TYPE> The type of cleanup.
+     */
+    private static class CleanupWithLabel<CLEANUP_TYPE> {
+
+        private final CLEANUP_TYPE cleanup;
+        private final String label;
+
+        public CleanupWithLabel(CLEANUP_TYPE cleanup, String label) {
+            this.cleanup = cleanup;
+            this.label = label;
+        }
+
+        public CLEANUP_TYPE getCleanup() {
+            return cleanup;
+        }
+
+        public String getLabel() {
+            return label;
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
index 3fd6dc4..f14d7aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
@@ -43,6 +43,12 @@ import java.util.concurrent.Executor;
  */
 public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory {
 
+    private static final String JOB_MANAGER_RUNNER_REGISTRY_LABEL = "JobManagerRunnerRegistry";
+    private static final String JOB_GRAPH_STORE_LABEL = "JobGraphStore";
+    private static final String BLOB_SERVER_LABEL = "BlobServer";
+    private static final String HA_SERVICES_LABEL = "HighAvailabilityServices";
+    private static final String JOB_MANAGER_METRIC_GROUP_LABEL = "JobManagerMetricGroup";
+
     private final Executor cleanupExecutor;
     private final RetryStrategy retryStrategy;
 
@@ -89,10 +95,10 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory
             ComponentMainThreadExecutor mainThreadExecutor) {
         return DefaultResourceCleaner.forLocallyCleanableResources(
                         mainThreadExecutor, cleanupExecutor, retryStrategy)
-                .withPrioritizedCleanup(jobManagerRunnerRegistry)
-                .withRegularCleanup(jobGraphWriter)
-                .withRegularCleanup(blobServer)
-                .withRegularCleanup(jobManagerMetricGroup)
+                .withPrioritizedCleanup(JOB_MANAGER_RUNNER_REGISTRY_LABEL, jobManagerRunnerRegistry)
+                .withRegularCleanup(JOB_GRAPH_STORE_LABEL, jobGraphWriter)
+                .withRegularCleanup(BLOB_SERVER_LABEL, blobServer)
+                .withRegularCleanup(JOB_MANAGER_METRIC_GROUP_LABEL, jobManagerMetricGroup)
                 .build();
     }
 
@@ -101,11 +107,14 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory
             ComponentMainThreadExecutor mainThreadExecutor) {
         return DefaultResourceCleaner.forGloballyCleanableResources(
                         mainThreadExecutor, cleanupExecutor, retryStrategy)
-                .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry))
-                .withRegularCleanup(jobGraphWriter)
-                .withRegularCleanup(blobServer)
-                .withRegularCleanup(highAvailabilityServices)
-                .withRegularCleanup(ofLocalResource(jobManagerMetricGroup))
+                .withPrioritizedCleanup(
+                        JOB_MANAGER_RUNNER_REGISTRY_LABEL,
+                        ofLocalResource(jobManagerRunnerRegistry))
+                .withRegularCleanup(JOB_GRAPH_STORE_LABEL, jobGraphWriter)
+                .withRegularCleanup(BLOB_SERVER_LABEL, blobServer)
+                .withRegularCleanup(HA_SERVICES_LABEL, highAvailabilityServices)
+                .withRegularCleanup(
+                        JOB_MANAGER_METRIC_GROUP_LABEL, ofLocalResource(jobManagerMetricGroup))
                 .build();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
index df16df7..39d332d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
@@ -34,6 +34,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -61,8 +62,8 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> cleanupResult =
                 createTestInstanceBuilder()
-                        .withRegularCleanup(cleanup0)
-                        .withRegularCleanup(cleanup1)
+                        .withRegularCleanup("Reg #0", cleanup0)
+                        .withRegularCleanup("Reg #1", cleanup1)
                         .build()
                         .cleanupAsync(JOB_ID);
 
@@ -84,8 +85,8 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> cleanupResult =
                 createTestInstanceBuilder()
-                        .withRegularCleanup(cleanup0)
-                        .withRegularCleanup(cleanup1)
+                        .withRegularCleanup("Reg #0", cleanup0)
+                        .withRegularCleanup("Reg #1", cleanup1)
                         .build()
                         .cleanupAsync(JOB_ID);
 
@@ -105,6 +106,7 @@ public class DefaultResourceCleanerTest {
                 .hasExactlyElementsOfTypes(
                         ExecutionException.class,
                         FutureUtils.RetryException.class,
+                        CompletionException.class,
                         expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
@@ -117,8 +119,8 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> cleanupResult =
                 createTestInstanceBuilder()
-                        .withRegularCleanup(cleanup0)
-                        .withRegularCleanup(cleanup1)
+                        .withRegularCleanup("Reg #0", cleanup0)
+                        .withRegularCleanup("Reg #1", cleanup1)
                         .build()
                         .cleanupAsync(JOB_ID);
 
@@ -138,6 +140,7 @@ public class DefaultResourceCleanerTest {
                 .hasExactlyElementsOfTypes(
                         ExecutionException.class,
                         FutureUtils.RetryException.class,
+                        CompletionException.class,
                         expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
@@ -154,10 +157,10 @@ public class DefaultResourceCleanerTest {
 
         final DefaultResourceCleaner<CleanupCallback> testInstance =
                 createTestInstanceBuilder()
-                        .withPrioritizedCleanup(highPriorityCleanup)
-                        .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
-                        .withRegularCleanup(noPriorityCleanup0)
-                        .withRegularCleanup(noPriorityCleanup1)
+                        .withPrioritizedCleanup("Prio #0", highPriorityCleanup)
+                        .withPrioritizedCleanup("Prio #1", lowerThanHighPriorityCleanup)
+                        .withRegularCleanup("Reg #0", noPriorityCleanup0)
+                        .withRegularCleanup("Reg #1", noPriorityCleanup1)
                         .build();
 
         final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
@@ -189,10 +192,10 @@ public class DefaultResourceCleanerTest {
 
         final DefaultResourceCleaner<CleanupCallback> testInstance =
                 createTestInstanceBuilder()
-                        .withPrioritizedCleanup(highPriorityCleanup)
-                        .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
-                        .withRegularCleanup(noPriorityCleanup0)
-                        .withRegularCleanup(noPriorityCleanup1)
+                        .withPrioritizedCleanup("Prio #0", highPriorityCleanup)
+                        .withPrioritizedCleanup("Prio #1", lowerThanHighPriorityCleanup)
+                        .withRegularCleanup("Reg #0", noPriorityCleanup0)
+                        .withRegularCleanup("Reg #1", noPriorityCleanup1)
                         .build();
 
         assertThat(highPriorityCleanup.isDone()).isFalse();
@@ -224,8 +227,8 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> compositeCleanupResult =
                 createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(2))
-                        .withRegularCleanup(cleanupWithRetries)
-                        .withRegularCleanup(oneRunCleanup)
+                        .withRegularCleanup("Reg #0", cleanupWithRetries)
+                        .withRegularCleanup("Reg #1", oneRunCleanup)
                         .build()
                         .cleanupAsync(JOB_ID);
 
@@ -246,9 +249,9 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> compositeCleanupResult =
                 createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(1))
-                        .withPrioritizedCleanup(cleanupWithRetry)
-                        .withPrioritizedCleanup(oneRunHigherPriorityCleanup)
-                        .withRegularCleanup(oneRunCleanup)
+                        .withPrioritizedCleanup("Prio #0", cleanupWithRetry)
+                        .withPrioritizedCleanup("Prio #1", oneRunHigherPriorityCleanup)
+                        .withRegularCleanup("Reg #0", oneRunCleanup)
                         .build()
                         .cleanupAsync(JOB_ID);