You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/12 10:02:27 UTC

[GitHub] [flink] XComp commented on a diff in pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

XComp commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848244397


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -420,7 +420,7 @@ public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
                                         .build())));
 
         // wait for job to finish
-        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   Switching from `requestJobResult` to `getJobTerminationFuture` we're now always waiting for the cleanup to be done before triggering whatever logic we want to test afterwards. That means, that the `JobManagerRunner` is always deregistered. I'm wondering whether we should add a test to `DispatcherTest` that covers the codepath of `Dispatcher#requestJobStatus` where the `JobManagerRunner` is not unregistered, yet. WDYT? 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1062,35 +1066,57 @@ protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGr
 
         archiveExecutionGraph(executionGraphInfo);
 
+        final CompletableFuture<Void> writeFuture = new CompletableFuture<>();
         if (terminalJobStatus.isGloballyTerminalState()) {
             final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
+
+            ioExecutor.execute(
+                    () -> {
+                        try {
+                            if (jobResultStore.hasCleanJobResultEntry(jobId)) {
+                                log.warn(
+                                        "Job {} is already marked as clean but clean up was triggered again.",
+                                        jobId);
+                            } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
+                                jobResultStore.createDirtyResult(
+                                        new JobResultEntry(
+                                                JobResult.createFrom(
+                                                        executionGraphInfo
+                                                                .getArchivedExecutionGraph())));
+                                log.info(
+                                        "Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
+                                        jobId);
+                            }
+                        } catch (IOException e) {
+                            writeFuture.completeExceptionally(e);
+                            return;
+                        }
+                        writeFuture.complete(null);
+                    });
+        } else {
+            writeFuture.complete(null);
         }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+        return writeFuture
+                .handleAsync(
+                        (ignored, error) -> {
+                            if (error != null) {
+                                fatalErrorHandler.onFatalError(
+                                        new FlinkException(
+                                                String.format(
+                                                        "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
+                                                        executionGraphInfo.getJobId()),
+                                                error));
+                            }
+                            return null;
+                        },
+                        getMainThreadExecutor())
+                .thenApply(
+                        (ignored) -> {
+                            return terminalJobStatus.isGloballyTerminalState()
+                                    ? CleanupJobState.GLOBAL
+                                    : CleanupJobState.LOCAL;
+                        });

Review Comment:
   ```
           if (!terminalJobStatus.isGloballyTerminalState()) {
               return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
           }
   
           final JobID jobId = executionGraphInfo.getJobId();
           CompletableFuture.runAsync(
                       () -> {
                           try {
                               if (jobResultStore.hasCleanJobResultEntry(jobId)) {
                                   log.warn(
                                           "Job {} is already marked as clean but clean up was triggered again.",
                                           jobId);
                               } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
                                   jobResultStore.createDirtyResult(
                                           new JobResultEntry(
                                                   JobResult.createFrom(
                                                           executionGraphInfo
                                                                   .getArchivedExecutionGraph())));
                                   log.info(
                                           "Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
                                           jobId);
                               }
                           } catch (IOException e) {
                               throw new CompletionException(e);
                           }
                       })
                   .handleAsync(
                           (ignored, error) -> {
                               if (error != null) {
                                   fatalErrorHandler.onFatalError(
                                           new FlinkException(
                                                   String.format(
                                                           "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
                                                           executionGraphInfo.getJobId()),
                                                   error));
                               }
                               return CleanupJobState.GLOBAL;
                           },
                           getMainThreadExecutor());
   ```
   That's more of a cosmetic change but what about inverting the if condition and returning the `CleanupJobState.LOCAL` earlier. Then, we would have a clearer separation between the local and the global terminal state handling in this method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,15 +1241,10 @@ private CompletableFuture<Void> waitForTerminatingJob(
                 getMainThreadExecutor());
     }
 
+    @VisibleForTesting
     CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-        if (jobManagerRunnerRegistry.isRegistered(jobId)) {

Review Comment:
   This method is only called through `Dispatcher#waitForTerminatingJob` > `Dispatcher#internalSubmitJob` and `internalSubmitJob` is triggered within `Dispatcher#submitJob` after it is verified that there is no `JobManagerRunner` registered for this `JobID` (check `Dispatcher#isDuplicateJob` which is used in [Dispatcher#submitJob:435](https://github.com/apache/flink/blob/05707cf8955f190d65021d61c5afd8164e831315/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L435)). Therefore, it looks like this `DispatcherException` more or less worked like a state invariant that got propagated to the user (the job submission would fail in that case).
   
   Removing this part is reasonable in my opinion. I'm just wondering whether we want to add a Precondition here still to have the invariant still being covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org