You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/11 19:35:12 UTC

[GitHub] [flink] zentol opened a new pull request, #19427: [FLINK-27140][coordination] Write job result in ioExecutor

zentol opened a new pull request, #19427:
URL: https://github.com/apache/flink/pull/19427

   Prevents the main thread from doing a potentially long running operation, moving it instead to the io executor.
   
   Needed to adjust some tests to rely on the runner termination future instead of the job result future, as the latter does not guarantee that the runner is cleaned up, which is required for these tests to pass because the mocked JMRunner doesn't implement all required methods (and if the runner is terminated then the dispatcher handles everything).
   This was previously fine because the job execution, job termination and runner termination happened synchronously by virtue of all futures already being completed and us never switching executors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848244397


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -420,7 +420,7 @@ public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
                                         .build())));
 
         // wait for job to finish
-        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   Switching from `requestJobResult` to `getJobTerminationFuture` we're now always waiting for the cleanup to be done before triggering whatever logic we want to test afterwards. That means, that the `JobManagerRunner` is always deregistered. I'm wondering whether we should add a test to `DispatcherTest` that covers the codepath of `Dispatcher#requestJobStatus` where the `JobManagerRunner` is not unregistered, yet. WDYT? 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1062,35 +1066,57 @@ protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGr
 
         archiveExecutionGraph(executionGraphInfo);
 
+        final CompletableFuture<Void> writeFuture = new CompletableFuture<>();
         if (terminalJobStatus.isGloballyTerminalState()) {
             final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
+
+            ioExecutor.execute(
+                    () -> {
+                        try {
+                            if (jobResultStore.hasCleanJobResultEntry(jobId)) {
+                                log.warn(
+                                        "Job {} is already marked as clean but clean up was triggered again.",
+                                        jobId);
+                            } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
+                                jobResultStore.createDirtyResult(
+                                        new JobResultEntry(
+                                                JobResult.createFrom(
+                                                        executionGraphInfo
+                                                                .getArchivedExecutionGraph())));
+                                log.info(
+                                        "Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
+                                        jobId);
+                            }
+                        } catch (IOException e) {
+                            writeFuture.completeExceptionally(e);
+                            return;
+                        }
+                        writeFuture.complete(null);
+                    });
+        } else {
+            writeFuture.complete(null);
         }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+        return writeFuture
+                .handleAsync(
+                        (ignored, error) -> {
+                            if (error != null) {
+                                fatalErrorHandler.onFatalError(
+                                        new FlinkException(
+                                                String.format(
+                                                        "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
+                                                        executionGraphInfo.getJobId()),
+                                                error));
+                            }
+                            return null;
+                        },
+                        getMainThreadExecutor())
+                .thenApply(
+                        (ignored) -> {
+                            return terminalJobStatus.isGloballyTerminalState()
+                                    ? CleanupJobState.GLOBAL
+                                    : CleanupJobState.LOCAL;
+                        });

Review Comment:
   ```
           if (!terminalJobStatus.isGloballyTerminalState()) {
               return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
           }
   
           final JobID jobId = executionGraphInfo.getJobId();
           CompletableFuture.runAsync(
                       () -> {
                           try {
                               if (jobResultStore.hasCleanJobResultEntry(jobId)) {
                                   log.warn(
                                           "Job {} is already marked as clean but clean up was triggered again.",
                                           jobId);
                               } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
                                   jobResultStore.createDirtyResult(
                                           new JobResultEntry(
                                                   JobResult.createFrom(
                                                           executionGraphInfo
                                                                   .getArchivedExecutionGraph())));
                                   log.info(
                                           "Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
                                           jobId);
                               }
                           } catch (IOException e) {
                               throw new CompletionException(e);
                           }
                       })
                   .handleAsync(
                           (ignored, error) -> {
                               if (error != null) {
                                   fatalErrorHandler.onFatalError(
                                           new FlinkException(
                                                   String.format(
                                                           "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
                                                           executionGraphInfo.getJobId()),
                                                   error));
                               }
                               return CleanupJobState.GLOBAL;
                           },
                           getMainThreadExecutor());
   ```
   That's more of a cosmetic change but what about inverting the if condition and returning the `CleanupJobState.LOCAL` earlier. Then, we would have a clearer separation between the local and the global terminal state handling in this method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,15 +1241,10 @@ private CompletableFuture<Void> waitForTerminatingJob(
                 getMainThreadExecutor());
     }
 
+    @VisibleForTesting
     CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-        if (jobManagerRunnerRegistry.isRegistered(jobId)) {

Review Comment:
   This method is only called through `Dispatcher#waitForTerminatingJob` > `Dispatcher#internalSubmitJob` and `internalSubmitJob` is triggered within `Dispatcher#submitJob` after it is verified that there is no `JobManagerRunner` registered for this `JobID` (check `Dispatcher#isDuplicateJob` which is used in [Dispatcher#submitJob:435](https://github.com/apache/flink/blob/05707cf8955f190d65021d61c5afd8164e831315/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L435)). Therefore, it looks like this `DispatcherException` more or less worked like a state invariant that got propagated to the user (the job submission would fail in that case).
   
   Removing this part is reasonable in my opinion. I'm just wondering whether we want to add a Precondition here still to have the invariant still being covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19427:
URL: https://github.com/apache/flink/pull/19427#issuecomment-1096706765

   Valid point. I created FLINK-27204 to cover this topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848401132


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -420,7 +420,7 @@ public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
                                         .build())));
 
         // wait for job to finish
-        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   true, I was thinking of `requestMultipleJobDetails()`. In any case the PR doesn't reduce test coverage; I only touched tests that used a JMRunner which would've failed if the job result were to be requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848287320


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -420,7 +420,7 @@ public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
                                         .build())));
 
         // wait for job to finish
-        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   That's kinda covered by `testRequestMultipleJobDetails_returnsRunningOverSuspendedJob`; there one of the mock JMRunners doesn't terminate and implements the required methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
zentol merged PR #19427:
URL: https://github.com/apache/flink/pull/19427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848295455


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,15 +1241,10 @@ private CompletableFuture<Void> waitForTerminatingJob(
                 getMainThreadExecutor());
     }
 
+    @VisibleForTesting
     CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-        if (jobManagerRunnerRegistry.isRegistered(jobId)) {

Review Comment:
   I don't think we're weakening any variants. The previous ones stated that either
   a) the job is still running
   b) there _may_ be a termination future for the job.
   
   Since we now no longer care about a) there isn't anything to assert, because b) already allowed no termination future to be present.
   
   
   I'm not even sure if the previous behavior is correct; AFAICT if `waitForTerminatingJob` returns a failed future because the job is already running we then initiate the cleanup which could actually wreck the running job. Unless there's some safeguard in the resource cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848349135


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -420,7 +420,7 @@ public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
                                         .build())));
 
         // wait for job to finish
-        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   I'm not sure about that. `Dispatcher#requestJobResult` is not necessarily triggered before the job submission goes through in [DispatcherTest:969](https://github.com/apache/flink/blob/1cb80e6d1ed0331ba662e481705b6b716f29b26a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java#L969). Therefore, we might endup in the code branch of `requestJobResult` again where the `Dispatcher` accesses the `ExecutionGraphInfoStore`.
   
   I was thinking about something like this: https://gist.github.com/XComp/6f082c15ce10ceae3f743f84b6bc6903



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19427:
URL: https://github.com/apache/flink/pull/19427#issuecomment-1096698931

   > Ah, one more thing, I just remembered: We could also make the marking markJobAsClean call [...] be executed by the ioExecutor, couldn't we?
   
   Done.
   
   Although I am wondering whether we shouldn't make this the responsibility of the FileSystemJobResultStore at some point (which would required changes to the JRS interface to properly support async operations).
   As is one could argue that isDuplicateJob/isInGloballyTerminalState should also run in the ioExecutor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848401132


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -420,7 +420,7 @@ public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
                                         .build())));
 
         // wait for job to finish
-        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   true, I was thinking of `requestMultipleJobDetails()`. In any case the PR doesn't reduce test coverage; I only touched tests that used a JMRunner which would've failed if the job result were to be requested while it is still running.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19427:
URL: https://github.com/apache/flink/pull/19427#issuecomment-1095484285

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "05707cf8955f190d65021d61c5afd8164e831315",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "05707cf8955f190d65021d61c5afd8164e831315",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 05707cf8955f190d65021d61c5afd8164e831315 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848312679


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,15 +1241,10 @@ private CompletableFuture<Void> waitForTerminatingJob(
                 getMainThreadExecutor());
     }
 
+    @VisibleForTesting
     CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-        if (jobManagerRunnerRegistry.isRegistered(jobId)) {

Review Comment:
   Good point about the cleanup. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19427: [FLINK-27140][coordination] Write job result in ioExecutor

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19427:
URL: https://github.com/apache/flink/pull/19427#issuecomment-1096667718

   Ah, one more thing, I just remembered: We could also make the marking `markJobAsClean` call in [Dispatcher#removeJob:991](https://github.com/apache/flink/blob/5aaa075d585d0d71e15067ee91f8464149340ba6/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L991) being executed by the `ioExecutor`, couldn't we?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org