You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/25 07:52:23 UTC

[GitHub] [flink] XComp opened a new pull request, #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

XComp opened a new pull request, #19567:
URL: https://github.com/apache/flink/pull/19567

   ## What is the purpose of the change
   
   In job mode, we triggered the shutdown as soon as the job reached a globally terminal state. This was fine in 1.14- because we didn't do any promises on the cleanup anyway. With 1.15, we introduced retries for cleanup which results in the final termination taking longer. During cluster shutdown the ResourceManager is informed about deregistering the cluster which results in the workers being shutdown in case of active RMs (i.e. k8s and YARN). See further details in FLINK-26772 (parent issue of this issue).
   
   ## Brief change log
   
   * removed overwriting of `Dispatcher#jobReachedTerminalState` in `MiniDispatcher`
   * Introduced new method that is called when the job reached a globally terminal state which then gets implemented by `MiniDispatcher`
   
   ## Verifying this change
   
   * I extended existing tests to verify that the shutdown happens after the cleanup
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r858602877


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -652,20 +653,47 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
                 && executionType == ExecutionType.RECOVERY) {
             return CompletableFuture.completedFuture(
                     jobManagerRunnerFailed(
-                            jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
+                            jobManagerRunnerResult,
                             jobManagerRunnerResult.getInitializationFailure()));
         }
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobState {
+
+        private final JobStatus jobStatus;
+        private final boolean globalCleanup;
+
+        public static CleanupJobState localCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(jobStatus, false);
+        }
+
+        public static CleanupJobState globalCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(jobStatus, true);
+        }
+
+        private CleanupJobState(JobStatus jobStatus, boolean globalCleanup) {
+            this.jobStatus = jobStatus;
+            this.globalCleanup = globalCleanup;
+        }
+
+        public boolean isGlobalCleanup() {
+            return globalCleanup;
+        }
+
+        public JobStatus getJobStatus() {
+            return jobStatus;
+        }
     }
 
-    private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) {
-        jobMasterFailed(jobId, throwable);
-        return CleanupJobState.LOCAL;
+    private CleanupJobState jobManagerRunnerFailed(
+            JobManagerRunnerResult jobManagerRunnerResult, Throwable throwable) {

Review Comment:
   I added a test case to cover this code path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19567:
URL: https://github.com/apache/flink/pull/19567#issuecomment-1108211546

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c904fb021660771ba94a8698e8e20b6ad02c9e0e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c904fb021660771ba94a8698e8e20b6ad02c9e0e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c904fb021660771ba94a8698e8e20b6ad02c9e0e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19567:
URL: https://github.com/apache/flink/pull/19567#issuecomment-1112144600

   Fixed compilation error and force-pushed...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] autophagy commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
autophagy commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r858497481


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -652,20 +653,47 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
                 && executionType == ExecutionType.RECOVERY) {
             return CompletableFuture.completedFuture(
                     jobManagerRunnerFailed(
-                            jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
+                            jobManagerRunnerResult,
                             jobManagerRunnerResult.getInitializationFailure()));
         }
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobState {
+
+        private final JobStatus jobStatus;
+        private final boolean globalCleanup;
+
+        public static CleanupJobState localCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(jobStatus, false);
+        }
+
+        public static CleanupJobState globalCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(jobStatus, true);
+        }
+
+        private CleanupJobState(JobStatus jobStatus, boolean globalCleanup) {
+            this.jobStatus = jobStatus;
+            this.globalCleanup = globalCleanup;
+        }
+
+        public boolean isGlobalCleanup() {
+            return globalCleanup;
+        }
+
+        public JobStatus getJobStatus() {
+            return jobStatus;
+        }
     }
 
-    private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) {
-        jobMasterFailed(jobId, throwable);
-        return CleanupJobState.LOCAL;
+    private CleanupJobState jobManagerRunnerFailed(
+            JobManagerRunnerResult jobManagerRunnerResult, Throwable throwable) {

Review Comment:
   Small question: since you're pulling `getExecutionGraphInfo().getJobId()` and `.getExecutionGraphInfo().getArchivedExecutionGraph().getState()` from the result, would it make more sense to write the type signature as:
   
   ```
   private CleanupJobState jobManagerRunnerFailed(
               ExecutionGraphInfo executionGraphInfo, Throwable throwable) {
   ```
   
   Or is passing the result in better from a readability point of view?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r860644917


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -659,14 +661,36 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobStateWithJobStatusIfGlobal {
+
+        private static final CleanupJobStateWithJobStatusIfGlobal LOCAL =
+                new CleanupJobStateWithJobStatusIfGlobal(null);

Review Comment:
   Do we really need a nullable jobstatus in the first place?
   
   * jobManagerRunnerFailed could use initializing or failed
   * jobReachedTerminalState could use suspended



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r860671795


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -659,14 +661,36 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobStateWithJobStatusIfGlobal {
+
+        private static final CleanupJobStateWithJobStatusIfGlobal LOCAL =
+                new CleanupJobStateWithJobStatusIfGlobal(null);

Review Comment:
   That's a good point. That keeps it consistent. Thanks for questioning that approach... 👍 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -659,14 +661,36 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobStateWithJobStatusIfGlobal {
+
+        private static final CleanupJobStateWithJobStatusIfGlobal LOCAL =
+                new CleanupJobStateWithJobStatusIfGlobal(null);
+
+        @Nullable private final JobStatus jobStatus;
+
+        public static CleanupJobStateWithJobStatusIfGlobal globalCleanup(JobStatus jobStatus) {

Review Comment:
   hm, you're right. the not-null check should be moved into this method to make it fail early.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r860644917


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -659,14 +661,36 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobStateWithJobStatusIfGlobal {
+
+        private static final CleanupJobStateWithJobStatusIfGlobal LOCAL =
+                new CleanupJobStateWithJobStatusIfGlobal(null);

Review Comment:
   Do we really need a nullable jobstatus in the first place?
   
   * jobManagerRunnerFailed could use initializing
   * jobReachedTerminalState could use suspended



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r860641365


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -659,14 +661,36 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobStateWithJobStatusIfGlobal {
+
+        private static final CleanupJobStateWithJobStatusIfGlobal LOCAL =
+                new CleanupJobStateWithJobStatusIfGlobal(null);
+
+        @Nullable private final JobStatus jobStatus;
+
+        public static CleanupJobStateWithJobStatusIfGlobal globalCleanup(JobStatus jobStatus) {

Review Comment:
   strange that jobStatus isn't nullable here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r858547701


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -652,20 +653,47 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
                 && executionType == ExecutionType.RECOVERY) {
             return CompletableFuture.completedFuture(
                     jobManagerRunnerFailed(
-                            jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
+                            jobManagerRunnerResult,
                             jobManagerRunnerResult.getInitializationFailure()));
         }
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobState {
+
+        private final JobStatus jobStatus;
+        private final boolean globalCleanup;
+
+        public static CleanupJobState localCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(jobStatus, false);
+        }
+
+        public static CleanupJobState globalCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(jobStatus, true);
+        }
+
+        private CleanupJobState(JobStatus jobStatus, boolean globalCleanup) {
+            this.jobStatus = jobStatus;
+            this.globalCleanup = globalCleanup;
+        }
+
+        public boolean isGlobalCleanup() {
+            return globalCleanup;
+        }
+
+        public JobStatus getJobStatus() {
+            return jobStatus;
+        }
     }
 
-    private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) {
-        jobMasterFailed(jobId, throwable);
-        return CleanupJobState.LOCAL;
+    private CleanupJobState jobManagerRunnerFailed(
+            JobManagerRunnerResult jobManagerRunnerResult, Throwable throwable) {

Review Comment:
   yikes, you have a point. But while looking into it I noticed that I did something stupid here: I used the `jobManagerRunnerResult` for `jobManagerRunnerFailed` in the case where `jobManagerRunnerResult` is actually `null`. I fixed this now. But I'm gonna double-check why this never came up in any test...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r858606141


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -659,14 +661,34 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobStateWithJobStatusIfGlobal {
+
+        private static final CleanupJobStateWithJobStatusIfGlobal LOCAL = new CleanupJobStateWithJobStatusIfGlobal(null);
+
+        @Nullable private final JobStatus jobStatus;
+
+        public static CleanupJobStateWithJobStatusIfGlobal globalCleanup(JobStatus jobStatus) {
+            return new CleanupJobStateWithJobStatusIfGlobal(jobStatus);
+        }
+
+        private CleanupJobStateWithJobStatusIfGlobal(@Nullable JobStatus jobStatus) {
+            this.jobStatus = jobStatus;
+        }
+
+        public boolean isGlobalCleanup() {
+            return jobStatus != null;
+        }
+
+        public JobStatus getJobStatus() {
+            Preconditions.checkState(

Review Comment:
   I'm not really happy about this Precondition. But I couldn't come up with a better way to pass in the JobStatus without touching other method signatures. Do you have a better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19567:
URL: https://github.com/apache/flink/pull/19567#issuecomment-1109682113

   I added a hotfix commit in addition to the actual change. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp merged pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp merged PR #19567:
URL: https://github.com/apache/flink/pull/19567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19567:
URL: https://github.com/apache/flink/pull/19567#issuecomment-1110611508

   Force-pushed to fix spotless error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] autophagy commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
autophagy commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r860607314


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -659,14 +661,34 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobStateWithJobStatusIfGlobal {
+
+        private static final CleanupJobStateWithJobStatusIfGlobal LOCAL = new CleanupJobStateWithJobStatusIfGlobal(null);
+
+        @Nullable private final JobStatus jobStatus;
+
+        public static CleanupJobStateWithJobStatusIfGlobal globalCleanup(JobStatus jobStatus) {
+            return new CleanupJobStateWithJobStatusIfGlobal(jobStatus);
+        }
+
+        private CleanupJobStateWithJobStatusIfGlobal(@Nullable JobStatus jobStatus) {
+            this.jobStatus = jobStatus;
+        }
+
+        public boolean isGlobalCleanup() {
+            return jobStatus != null;
+        }
+
+        public JobStatus getJobStatus() {
+            Preconditions.checkState(

Review Comment:
   Not really - either the precondition check happens here or it happens at the `getJobStatus` callsites. Other than a checked exception, I can't see a nice way of doing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r860643792


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -659,14 +661,36 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobStateWithJobStatusIfGlobal {
+
+        private static final CleanupJobStateWithJobStatusIfGlobal LOCAL =
+                new CleanupJobStateWithJobStatusIfGlobal(null);
+
+        @Nullable private final JobStatus jobStatus;
+
+        public static CleanupJobStateWithJobStatusIfGlobal globalCleanup(JobStatus jobStatus) {

Review Comment:
   hm, you're right. the not-null check should be moved into this method to make it fail early.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19567:
URL: https://github.com/apache/flink/pull/19567#discussion_r858547701


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -652,20 +653,47 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
                 && executionType == ExecutionType.RECOVERY) {
             return CompletableFuture.completedFuture(
                     jobManagerRunnerFailed(
-                            jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
+                            jobManagerRunnerResult,
                             jobManagerRunnerResult.getInitializationFailure()));
         }
         return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobState {
+
+        private final JobStatus jobStatus;
+        private final boolean globalCleanup;
+
+        public static CleanupJobState localCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(jobStatus, false);
+        }
+
+        public static CleanupJobState globalCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(jobStatus, true);
+        }
+
+        private CleanupJobState(JobStatus jobStatus, boolean globalCleanup) {
+            this.jobStatus = jobStatus;
+            this.globalCleanup = globalCleanup;
+        }
+
+        public boolean isGlobalCleanup() {
+            return globalCleanup;
+        }
+
+        public JobStatus getJobStatus() {
+            return jobStatus;
+        }
     }
 
-    private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) {
-        jobMasterFailed(jobId, throwable);
-        return CleanupJobState.LOCAL;
+    private CleanupJobState jobManagerRunnerFailed(
+            JobManagerRunnerResult jobManagerRunnerResult, Throwable throwable) {

Review Comment:
   yikes, you have a point. But while looking into it I noticed that I did something stupid here: I used the `jobManagerRunnerResult` for `jobManagerRunnerFailed` in the case where `jobManagerRunnerResult` is actually `null`. I fixed this now. But I'm gonna double-check why this never came up in any test. It should have resulted in a `NullPointException` but it seems that all tests pass which means that we're not covering this codepath, yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19567: [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19567:
URL: https://github.com/apache/flink/pull/19567#issuecomment-1109626153

   The force-push also included a rebase to fix the `master` conflict resolution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org