You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/15 15:53:59 UTC

[GitHub] [flink] zentol commented on a change in pull request #19102: [FLINK-26652][runtime] Makes Flink cluster not fail in case of the cleanup retries being exhausted

zentol commented on a change in pull request #19102:
URL: https://github.com/apache/flink/pull/19102#discussion_r827129983



##########
File path: docs/content.zh/docs/deployment/overview.md
##########
@@ -158,6 +158,11 @@ Once a job has reached a globally terminal state of either finished, failed or c
 external component resources associated with the job are then cleaned up. In the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. You can
 [configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used.
+Reaching the maximum number of retries without succeeding will leave the job in a dirty state.
+Its artifacts (and its [JobResultStore]({{< ref "docs/deployment/ha/overview#jobresultstore" >}})
+entry like in Application Mode) would need to be cleaned up manually. Restarting the very same

Review comment:
       ```suggestion
   entry) would need to be cleaned up manually. Restarting the very same
   ```
   Reads a bit weird, and I'm not sure why application mode should be explicitly mentioned.

##########
File path: docs/content/docs/deployment/overview.md
##########
@@ -158,7 +158,12 @@ When deploying Flink, there are often multiple options available for each buildi
 Once a job has reached a globally terminal state of either finished, failed or cancelled, the
 external component resources associated with the job are then cleaned up. In the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. You can
-[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used.
+[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used. 
+Reaching the maximum number of retries without succeeding will leave the job in a dirty state. 

Review comment:
       Do we have any documentation that explains the consequences of being in a dirty state?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -615,7 +615,16 @@ private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionTy
 
         final CompletableFuture<Void> jobTerminationFuture =
                 cleanupJobStateFuture.thenCompose(
-                        cleanupJobState -> removeJob(jobId, cleanupJobState));
+                        cleanupJobState ->
+                                removeJob(jobId, cleanupJobState)
+                                        .exceptionally(
+                                                throwable -> {
+                                                    log.warn(
+                                                            "The cleanup of job {} failed. The job's artifacts and its JobResultStore entry needs to be cleaned manually.",
+                                                            jobId,

Review comment:
       would it be possible to list the paths that need to be cleaned up?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -288,7 +304,7 @@ public void testCleanupAfterLeadershipChange() throws Exception {
         dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
         waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
-        jobGraphRemovalErrorReceived.await();
+        firstCleanupFailsLatch.trigger();

Review comment:
       I don't see where we guarantee that the cleanup isn't re-run immediately and finishes before the leadership has changed.

##########
File path: docs/content/docs/deployment/overview.md
##########
@@ -158,7 +158,12 @@ When deploying Flink, there are often multiple options available for each buildi
 Once a job has reached a globally terminal state of either finished, failed or cancelled, the
 external component resources associated with the job are then cleaned up. In the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. You can
-[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used.
+[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used. 
+Reaching the maximum number of retries without succeeding will leave the job in a dirty state. 
+Its artifacts (and its [JobResultStore]({{< ref "docs/deployment/ha/overview#jobresultstore" >}}) 
+entry like in Application Mode) would need to be cleaned up manually. Restarting the very same 
+job (i.e. using the same job ID) would result in the retryable cleanup being picked up again 

Review comment:
       ```suggestion
   job (i.e. using the same job ID) will result in the cleanup being restarted 
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -249,11 +259,30 @@ public void testCleanupAfterLeadershipChange() throws Exception {
 
         // Construct job graph store.
         final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
+        final OneShotLatch firstCleanupFailsLatch = new OneShotLatch();
         final OneShotLatch successfulCleanupLatch = new OneShotLatch();
-        final RuntimeException temporaryError = new RuntimeException("Unable to remove job graph.");
         final JobGraphStore jobGraphStore =
-                createAndStartJobGraphStoreWithCleanupFailures(
-                        1, temporaryError, actualGlobalCleanupCallCount, successfulCleanupLatch);
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (ignoredJobId, ignoredExecutor) -> {
+                                    try {
+                                        firstCleanupFailsLatch.await();
+                                    } catch (InterruptedException e) {
+                                        throw new CompletionException(e);
+                                    }
+
+                                    if (actualGlobalCleanupCallCount.getAndIncrement() < 1) {
+                                        return FutureUtils.completedExceptionally(
+                                                new RuntimeException(
+                                                        "Expected RuntimeException: Unable to remove job graph."));
+                                    }
+
+                                    successfulCleanupLatch.trigger();
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        jobGraphStore.start(null);

Review comment:
       see above

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -133,12 +132,23 @@ public void testCleanupThroughRetries() throws Exception {
         final int numberOfErrors = 5;
         final RuntimeException temporaryError =
                 new RuntimeException("Expected RuntimeException: Unable to remove job graph.");
+        final AtomicInteger failureCount = new AtomicInteger(numberOfErrors);
         final JobGraphStore jobGraphStore =
-                createAndStartJobGraphStoreWithCleanupFailures(
-                        numberOfErrors,
-                        temporaryError,
-                        actualGlobalCleanupCallCount,
-                        successfulCleanupLatch);
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (ignoredJobId, ignoredExecutor) -> {
+                                    actualGlobalCleanupCallCount.incrementAndGet();
+
+                                    if (failureCount.getAndDecrement() > 0) {
+                                        return FutureUtils.completedExceptionally(temporaryError);
+                                    }
+
+                                    successfulCleanupLatch.trigger();
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        jobGraphStore.start(null);

Review comment:
       ```suggestion
           jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org