You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/10 14:16:15 UTC

[flink] branch master updated (254c918 -> c5352fc)

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 254c918  [FLINK-26550][checkpoint] Correct the information of checkpoint failure
     new f0fe63e  [hotfix][runtime] Adds missing @ExtendWith to DefaultResourceCleanerTest
     new 6ea1782  [hotfix][runtime] Adds check for consistency to avoid NullPointerException
     new c5352fc  [FLINK-26494][runtime] Adds log message to cleanup failure

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dispatcher/cleanup/DefaultResourceCleaner.java | 102 +++++++++++++++++----
 .../cleanup/DispatcherResourceCleanerFactory.java  |  27 ++++--
 .../highavailability/FileSystemJobResultStore.java |   7 +-
 .../cleanup/DefaultResourceCleanerTest.java        |  44 +++++----
 4 files changed, 134 insertions(+), 46 deletions(-)

[flink] 03/03: [FLINK-26494][runtime] Adds log message to cleanup failure

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5352fc55972420ed5bf1afdfd97834540b1407a
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Mar 4 21:51:30 2022 +0100

    [FLINK-26494][runtime] Adds log message to cleanup failure
---
 .../dispatcher/cleanup/DefaultResourceCleaner.java | 102 +++++++++++++++++----
 .../cleanup/DispatcherResourceCleanerFactory.java  |  27 ++++--
 .../cleanup/DefaultResourceCleanerTest.java        |  41 +++++----
 3 files changed, 125 insertions(+), 45 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
index 6226f74..77c40d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.RetryStrategy;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -37,12 +40,14 @@ import java.util.stream.Collectors;
  */
 public class DefaultResourceCleaner<T> implements ResourceCleaner {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceCleaner.class);
+
     private final ComponentMainThreadExecutor mainThreadExecutor;
     private final Executor cleanupExecutor;
     private final CleanupFn<T> cleanupFn;
 
-    private final Collection<T> prioritizedCleanup;
-    private final Collection<T> regularCleanup;
+    private final Collection<CleanupWithLabel<T>> prioritizedCleanup;
+    private final Collection<CleanupWithLabel<T>> regularCleanup;
 
     private final RetryStrategy retryStrategy;
 
@@ -97,8 +102,8 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
 
         private final RetryStrategy retryStrategy;
 
-        private final Collection<T> prioritizedCleanup = new ArrayList<>();
-        private final Collection<T> regularCleanup = new ArrayList<>();
+        private final Collection<CleanupWithLabel<T>> prioritizedCleanup = new ArrayList<>();
+        private final Collection<CleanupWithLabel<T>> regularCleanup = new ArrayList<>();
 
         private Builder(
                 ComponentMainThreadExecutor mainThreadExecutor,
@@ -117,10 +122,13 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
          * resources are added matters, i.e. if two cleanable resources are added as prioritized
          * cleanup tasks, the resource being added first will block the cleanup of the second
          * resource. All prioritized cleanup resources will run and finish before any resource that
-         * is added using {@link #withRegularCleanup(Object)} is started.
+         * is added using {@link #withRegularCleanup(String, Object)} is started.
+         *
+         * @param label The label being used when logging errors in the given cleanup.
+         * @param prioritizedCleanup The cleanup callback that is going to be prioritized.
          */
-        public Builder<T> withPrioritizedCleanup(T prioritizedCleanup) {
-            this.prioritizedCleanup.add(prioritizedCleanup);
+        public Builder<T> withPrioritizedCleanup(String label, T prioritizedCleanup) {
+            this.prioritizedCleanup.add(new CleanupWithLabel<>(prioritizedCleanup, label));
             return this;
         }
 
@@ -128,10 +136,13 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
          * Regular cleanups are resources for which the cleanup is triggered after all prioritized
          * cleanups succeeded. All added regular cleanups will run concurrently to each other.
          *
-         * @see #withPrioritizedCleanup(Object)
+         * @param label The label being used when logging errors in the given cleanup.
+         * @param regularCleanup The cleanup callback that is going to run after all prioritized
+         *     cleanups are finished.
+         * @see #withPrioritizedCleanup(String, Object)
          */
-        public Builder<T> withRegularCleanup(T regularCleanup) {
-            this.regularCleanup.add(regularCleanup);
+        public Builder<T> withRegularCleanup(String label, T regularCleanup) {
+            this.regularCleanup.add(new CleanupWithLabel<>(regularCleanup, label));
             return this;
         }
 
@@ -150,8 +161,8 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
             ComponentMainThreadExecutor mainThreadExecutor,
             Executor cleanupExecutor,
             CleanupFn<T> cleanupFn,
-            Collection<T> prioritizedCleanup,
-            Collection<T> regularCleanup,
+            Collection<CleanupWithLabel<T>> prioritizedCleanup,
+            Collection<CleanupWithLabel<T>> regularCleanup,
             RetryStrategy retryStrategy) {
         this.mainThreadExecutor = mainThreadExecutor;
         this.cleanupExecutor = cleanupExecutor;
@@ -166,22 +177,79 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner {
         mainThreadExecutor.assertRunningInMainThread();
 
         CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture();
-        for (T cleanup : prioritizedCleanup) {
-            cleanupFuture = cleanupFuture.thenCompose(ignoredValue -> withRetry(jobId, cleanup));
+        for (CleanupWithLabel<T> cleanupWithLabel : prioritizedCleanup) {
+            cleanupFuture =
+                    cleanupFuture.thenCompose(
+                            ignoredValue ->
+                                    withRetry(
+                                            jobId,
+                                            cleanupWithLabel.getLabel(),
+                                            cleanupWithLabel.getCleanup()));
         }
 
         return cleanupFuture.thenCompose(
                 ignoredValue ->
                         FutureUtils.completeAll(
                                 regularCleanup.stream()
-                                        .map(cleanup -> withRetry(jobId, cleanup))
+                                        .map(
+                                                cleanupWithLabel ->
+                                                        withRetry(
+                                                                jobId,
+                                                                cleanupWithLabel.getLabel(),
+                                                                cleanupWithLabel.getCleanup()))
                                         .collect(Collectors.toList())));
     }
 
-    private CompletableFuture<Void> withRetry(JobID jobId, T cleanup) {
+    private CompletableFuture<Void> withRetry(JobID jobId, String label, T cleanup) {
         return FutureUtils.retryWithDelay(
-                () -> cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor),
+                () ->
+                        cleanupFn
+                                .cleanupAsync(cleanup, jobId, cleanupExecutor)
+                                .whenComplete(
+                                        (value, throwable) -> {
+                                            if (throwable != null) {
+                                                final String logMessage =
+                                                        String.format(
+                                                                "Cleanup of %s failed for job %s due to a %s: %s",
+                                                                label,
+                                                                jobId,
+                                                                throwable
+                                                                        .getClass()
+                                                                        .getSimpleName(),
+                                                                throwable.getMessage());
+                                                if (LOG.isTraceEnabled()) {
+                                                    LOG.warn(logMessage, throwable);
+                                                } else {
+                                                    LOG.warn(logMessage);
+                                                }
+                                            }
+                                        }),
                 retryStrategy,
                 mainThreadExecutor);
     }
+
+    /**
+     * {@code CleanupWithLabel} makes it possible to attach a label to a given cleanup that can be
+     * used as human-readable representation of the corresponding cleanup.
+     *
+     * @param <CLEANUP_TYPE> The type of cleanup.
+     */
+    private static class CleanupWithLabel<CLEANUP_TYPE> {
+
+        private final CLEANUP_TYPE cleanup;
+        private final String label;
+
+        public CleanupWithLabel(CLEANUP_TYPE cleanup, String label) {
+            this.cleanup = cleanup;
+            this.label = label;
+        }
+
+        public CLEANUP_TYPE getCleanup() {
+            return cleanup;
+        }
+
+        public String getLabel() {
+            return label;
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
index 3fd6dc4..f14d7aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
@@ -43,6 +43,12 @@ import java.util.concurrent.Executor;
  */
 public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory {
 
+    private static final String JOB_MANAGER_RUNNER_REGISTRY_LABEL = "JobManagerRunnerRegistry";
+    private static final String JOB_GRAPH_STORE_LABEL = "JobGraphStore";
+    private static final String BLOB_SERVER_LABEL = "BlobServer";
+    private static final String HA_SERVICES_LABEL = "HighAvailabilityServices";
+    private static final String JOB_MANAGER_METRIC_GROUP_LABEL = "JobManagerMetricGroup";
+
     private final Executor cleanupExecutor;
     private final RetryStrategy retryStrategy;
 
@@ -89,10 +95,10 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory
             ComponentMainThreadExecutor mainThreadExecutor) {
         return DefaultResourceCleaner.forLocallyCleanableResources(
                         mainThreadExecutor, cleanupExecutor, retryStrategy)
-                .withPrioritizedCleanup(jobManagerRunnerRegistry)
-                .withRegularCleanup(jobGraphWriter)
-                .withRegularCleanup(blobServer)
-                .withRegularCleanup(jobManagerMetricGroup)
+                .withPrioritizedCleanup(JOB_MANAGER_RUNNER_REGISTRY_LABEL, jobManagerRunnerRegistry)
+                .withRegularCleanup(JOB_GRAPH_STORE_LABEL, jobGraphWriter)
+                .withRegularCleanup(BLOB_SERVER_LABEL, blobServer)
+                .withRegularCleanup(JOB_MANAGER_METRIC_GROUP_LABEL, jobManagerMetricGroup)
                 .build();
     }
 
@@ -101,11 +107,14 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory
             ComponentMainThreadExecutor mainThreadExecutor) {
         return DefaultResourceCleaner.forGloballyCleanableResources(
                         mainThreadExecutor, cleanupExecutor, retryStrategy)
-                .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry))
-                .withRegularCleanup(jobGraphWriter)
-                .withRegularCleanup(blobServer)
-                .withRegularCleanup(highAvailabilityServices)
-                .withRegularCleanup(ofLocalResource(jobManagerMetricGroup))
+                .withPrioritizedCleanup(
+                        JOB_MANAGER_RUNNER_REGISTRY_LABEL,
+                        ofLocalResource(jobManagerRunnerRegistry))
+                .withRegularCleanup(JOB_GRAPH_STORE_LABEL, jobGraphWriter)
+                .withRegularCleanup(BLOB_SERVER_LABEL, blobServer)
+                .withRegularCleanup(HA_SERVICES_LABEL, highAvailabilityServices)
+                .withRegularCleanup(
+                        JOB_MANAGER_METRIC_GROUP_LABEL, ofLocalResource(jobManagerMetricGroup))
                 .build();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
index df16df7..39d332d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
@@ -34,6 +34,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -61,8 +62,8 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> cleanupResult =
                 createTestInstanceBuilder()
-                        .withRegularCleanup(cleanup0)
-                        .withRegularCleanup(cleanup1)
+                        .withRegularCleanup("Reg #0", cleanup0)
+                        .withRegularCleanup("Reg #1", cleanup1)
                         .build()
                         .cleanupAsync(JOB_ID);
 
@@ -84,8 +85,8 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> cleanupResult =
                 createTestInstanceBuilder()
-                        .withRegularCleanup(cleanup0)
-                        .withRegularCleanup(cleanup1)
+                        .withRegularCleanup("Reg #0", cleanup0)
+                        .withRegularCleanup("Reg #1", cleanup1)
                         .build()
                         .cleanupAsync(JOB_ID);
 
@@ -105,6 +106,7 @@ public class DefaultResourceCleanerTest {
                 .hasExactlyElementsOfTypes(
                         ExecutionException.class,
                         FutureUtils.RetryException.class,
+                        CompletionException.class,
                         expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
@@ -117,8 +119,8 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> cleanupResult =
                 createTestInstanceBuilder()
-                        .withRegularCleanup(cleanup0)
-                        .withRegularCleanup(cleanup1)
+                        .withRegularCleanup("Reg #0", cleanup0)
+                        .withRegularCleanup("Reg #1", cleanup1)
                         .build()
                         .cleanupAsync(JOB_ID);
 
@@ -138,6 +140,7 @@ public class DefaultResourceCleanerTest {
                 .hasExactlyElementsOfTypes(
                         ExecutionException.class,
                         FutureUtils.RetryException.class,
+                        CompletionException.class,
                         expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
@@ -154,10 +157,10 @@ public class DefaultResourceCleanerTest {
 
         final DefaultResourceCleaner<CleanupCallback> testInstance =
                 createTestInstanceBuilder()
-                        .withPrioritizedCleanup(highPriorityCleanup)
-                        .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
-                        .withRegularCleanup(noPriorityCleanup0)
-                        .withRegularCleanup(noPriorityCleanup1)
+                        .withPrioritizedCleanup("Prio #0", highPriorityCleanup)
+                        .withPrioritizedCleanup("Prio #1", lowerThanHighPriorityCleanup)
+                        .withRegularCleanup("Reg #0", noPriorityCleanup0)
+                        .withRegularCleanup("Reg #1", noPriorityCleanup1)
                         .build();
 
         final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
@@ -189,10 +192,10 @@ public class DefaultResourceCleanerTest {
 
         final DefaultResourceCleaner<CleanupCallback> testInstance =
                 createTestInstanceBuilder()
-                        .withPrioritizedCleanup(highPriorityCleanup)
-                        .withPrioritizedCleanup(lowerThanHighPriorityCleanup)
-                        .withRegularCleanup(noPriorityCleanup0)
-                        .withRegularCleanup(noPriorityCleanup1)
+                        .withPrioritizedCleanup("Prio #0", highPriorityCleanup)
+                        .withPrioritizedCleanup("Prio #1", lowerThanHighPriorityCleanup)
+                        .withRegularCleanup("Reg #0", noPriorityCleanup0)
+                        .withRegularCleanup("Reg #1", noPriorityCleanup1)
                         .build();
 
         assertThat(highPriorityCleanup.isDone()).isFalse();
@@ -224,8 +227,8 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> compositeCleanupResult =
                 createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(2))
-                        .withRegularCleanup(cleanupWithRetries)
-                        .withRegularCleanup(oneRunCleanup)
+                        .withRegularCleanup("Reg #0", cleanupWithRetries)
+                        .withRegularCleanup("Reg #1", oneRunCleanup)
                         .build()
                         .cleanupAsync(JOB_ID);
 
@@ -246,9 +249,9 @@ public class DefaultResourceCleanerTest {
 
         final CompletableFuture<Void> compositeCleanupResult =
                 createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(1))
-                        .withPrioritizedCleanup(cleanupWithRetry)
-                        .withPrioritizedCleanup(oneRunHigherPriorityCleanup)
-                        .withRegularCleanup(oneRunCleanup)
+                        .withPrioritizedCleanup("Prio #0", cleanupWithRetry)
+                        .withPrioritizedCleanup("Prio #1", oneRunHigherPriorityCleanup)
+                        .withRegularCleanup("Reg #0", oneRunCleanup)
                         .build()
                         .cleanupAsync(JOB_ID);
 

[flink] 02/03: [hotfix][runtime] Adds check for consistency to avoid NullPointerException

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ea1782246c0099c8b00067558739850f31e5a68
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Mar 4 14:39:35 2022 +0100

    [hotfix][runtime] Adds check for consistency to avoid NullPointerException
    
    LocalFileSystem.listStatus returns null in case of the path being invalid.
---
 .../flink/runtime/highavailability/FileSystemJobResultStore.java   | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
index 363f93e..a7ed3c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
@@ -167,8 +167,13 @@ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore {
 
     @Override
     public Set<JobResult> getDirtyResultsInternal() throws IOException {
+        final FileStatus[] statuses = fileSystem.listStatus(this.basePath);
+
+        Preconditions.checkState(
+                statuses != null,
+                "The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.");
+
         final Set<JobResult> dirtyResults = new HashSet<>();
-        FileStatus[] statuses = fileSystem.listStatus(this.basePath);
         for (FileStatus s : statuses) {
             if (!s.isDir()) {
                 if (hasValidDirtyJobResultStoreEntryExtension(s.getPath().getName())) {

[flink] 01/03: [hotfix][runtime] Adds missing @ExtendWith to DefaultResourceCleanerTest

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f0fe63ee0ba1983f5a4204f4e55d8961d8465743
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Sun Mar 6 16:34:49 2022 +0100

    [hotfix][runtime] Adds missing @ExtendWith to DefaultResourceCleanerTest
---
 .../flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java   | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
index 36fef1c..df16df7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
@@ -22,11 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.RetryStrategy;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -42,6 +44,7 @@ import static org.apache.flink.core.testutils.FlinkAssertions.STREAM_THROWABLE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** {@code DefaultResourceCleanerTest} tests {@link DefaultResourceCleaner}. */
+@ExtendWith(TestLoggerExtension.class)
 public class DefaultResourceCleanerTest {
 
     // runs with retry utilizes the ComponentMainThreadExecutor which adds concurrency despite using