You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/09 11:22:15 UTC

[GitHub] [flink] zentol commented on a change in pull request #18987: [FLINK-26494][runtime] Cleanup does not reveal exceptions

zentol commented on a change in pull request #18987:
URL: https://github.com/apache/flink/pull/18987#discussion_r822542232



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
##########
@@ -166,21 +188,49 @@ private DefaultResourceCleaner(
         mainThreadExecutor.assertRunningInMainThread();
 
         CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture();
-        for (T cleanup : prioritizedCleanup) {
-            cleanupFuture = cleanupFuture.thenCompose(ignoredValue -> withRetry(jobId, cleanup));
+        for (Map.Entry<String, T> cleanup : prioritizedCleanup.entrySet()) {
+            cleanupFuture =
+                    cleanupFuture.thenCompose(
+                            ignoredValue -> withRetry(jobId, cleanup.getKey(), cleanup.getValue()));
         }
 
         return cleanupFuture.thenCompose(
                 ignoredValue ->
                         FutureUtils.completeAll(
-                                regularCleanup.stream()
-                                        .map(cleanup -> withRetry(jobId, cleanup))
+                                regularCleanup.entrySet().stream()
+                                        .map(
+                                                cleanupEntry ->
+                                                        withRetry(
+                                                                jobId,
+                                                                cleanupEntry.getKey(),
+                                                                cleanupEntry.getValue()))
                                         .collect(Collectors.toList())));
     }
 
-    private CompletableFuture<Void> withRetry(JobID jobId, T cleanup) {
+    private CompletableFuture<Void> withRetry(JobID jobId, String label, T cleanup) {
         return FutureUtils.retryWithDelay(
-                () -> cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor),
+                () ->
+                        cleanupFn
+                                .cleanupAsync(cleanup, jobId, cleanupExecutor)
+                                .whenComplete(
+                                        (value, throwable) -> {
+                                            if (throwable != null) {
+                                                final String logMessage =
+                                                        String.format(
+                                                                "Cleanup of %s failed for job %s due to a %s: %s",
+                                                                label,
+                                                                jobId,
+                                                                throwable
+                                                                        .getClass()
+                                                                        .getSimpleName(),
+                                                                throwable.getMessage());
+                                                if (LOG.isTraceEnabled()) {
+                                                    LOG.trace(logMessage, throwable);

Review comment:
       ```suggestion
                                                       LOG.warn(logMessage, throwable);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
##########
@@ -97,8 +104,8 @@
 
         private final RetryStrategy retryStrategy;
 
-        private final Collection<T> prioritizedCleanup = new ArrayList<>();
-        private final Collection<T> regularCleanup = new ArrayList<>();
+        private final LinkedHashMap<String, T> prioritizedCleanup = new LinkedHashMap<>();
+        private final Map<String, T> regularCleanup = new HashMap<>();

Review comment:
       Why not encapsulate the label and T in a pojo?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##########
@@ -165,8 +171,16 @@ public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException {
 
     @Override
     public Set<JobResult> getDirtyResultsInternal() throws IOException {
+        final FileStatus[] statuses = fileSystem.listStatus(this.basePath);
+        if (statuses == null) {
+            LOG.warn(
+                    "The JobResultStore directory '"
+                            + basePath
+                            + "' was deleted. No persisted JobResults could be recovered.");
+            return Collections.emptySet();

Review comment:
       Isn't this expected if you start out in a fresh environment?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
##########
@@ -166,21 +188,49 @@ private DefaultResourceCleaner(
         mainThreadExecutor.assertRunningInMainThread();
 
         CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture();
-        for (T cleanup : prioritizedCleanup) {
-            cleanupFuture = cleanupFuture.thenCompose(ignoredValue -> withRetry(jobId, cleanup));
+        for (Map.Entry<String, T> cleanup : prioritizedCleanup.entrySet()) {
+            cleanupFuture =
+                    cleanupFuture.thenCompose(
+                            ignoredValue -> withRetry(jobId, cleanup.getKey(), cleanup.getValue()));
         }
 
         return cleanupFuture.thenCompose(
                 ignoredValue ->
                         FutureUtils.completeAll(
-                                regularCleanup.stream()
-                                        .map(cleanup -> withRetry(jobId, cleanup))
+                                regularCleanup.entrySet().stream()
+                                        .map(
+                                                cleanupEntry ->
+                                                        withRetry(
+                                                                jobId,
+                                                                cleanupEntry.getKey(),
+                                                                cleanupEntry.getValue()))
                                         .collect(Collectors.toList())));
     }
 
-    private CompletableFuture<Void> withRetry(JobID jobId, T cleanup) {
+    private CompletableFuture<Void> withRetry(JobID jobId, String label, T cleanup) {
         return FutureUtils.retryWithDelay(
-                () -> cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor),
+                () ->
+                        cleanupFn
+                                .cleanupAsync(cleanup, jobId, cleanupExecutor)
+                                .whenComplete(

Review comment:
       Isn't this swallowing the exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org