You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/15 11:08:11 UTC

[GitHub] [flink] zentol opened a new pull request, #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

zentol opened a new pull request, #19968:
URL: https://github.com/apache/flink/pull/19968

   Based on #19957.
   
   The `StopWithSavepoint` state of the adaptive scheduler has to account for a number of failure scenarios.
   
   One of these failure cases is a task failure with the savepoint going through. This for example happens when an exception is thrown in `notifyCheckpointComplete()`. In FLINK-26923 the code was changed to not trigger a restart in this case, and instead hard-fail the job, on the grounds that continuing a job despite a savepoint being created could lead to duplicate data.
   This change was done incorrectly, because it assumed the savepoint future to always complete first. There is however no such guarantee that this is actually the case.
   Primarily because the checkpoint coordinator does not run in the main thread it can happen that `any` failure happening around the time the CC completes the savepoint can end up calling into `StopWithSavepoint#onFailure`.
   
   This PR remedies this by forcing the handling of task failures or terminal states to wait for the savepoint to complete.
   
   In addition the PR refactors the state to be a little bit messy. Both the general and savepoint failure code paths handling were setting variables that influenced each other in certain scenarios, making the code exceptionally difficult to reason about.
   This now only happens in one direction, with the savepoint failure handling potentially being skipped if another failure happened.
   
   Finally the PR adds a lot of documentation for what failure scenarios exist and why things are done the way they are.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901587633


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -377,6 +378,7 @@ void testOnGloballyTerminalStateWaitsForSavepointCompletion() throws Exception {
             ctx.setStopWithSavepoint(sws);
 
             sws.onGloballyTerminalState(JobStatus.FINISHED);
+            // this is a sanity check that we haven't scheduled a state transition

Review Comment:
   there's another location in `testConcurrentSavepointFailureAndGloballyTerminalStateCauseRestart` (see [line 407](https://github.com/apache/flink/blob/106b83b88a1dab577b2f60b5a9dc8052526d3820/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java#L407)) where we could add the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901744714


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -49,19 +49,48 @@
  * of the operation) is made available via the "operationFuture" to the user. This operation is only
  * considered successfully if the "savepointFuture" completed successfully, and the job reached the
  * terminal state FINISHED.
+ *
+ * <p>This state has to cover several failure scenarios, depending on whether the savepoint
+ * succeeeds/fails and the job succeeds/fails/keeps running.
+ *
+ * <ul>
+ *   <li>Savepoint succeeds, job succeeds - The happy path we like to see.
+ *   <li>Savepoint fails, job fails - The generic failure case. Something happened during
+ *       checkpointing on the TM side that also failed the task; fail the savepoint operation and
+ *       restart the job.
+ *   <li>Savepoint succeeds, job fails - Some issue occurred in notifyCheckpointComplete or during
+ *       the job shutdown. Fail the savepoint operation and job, but inform the user about the
+ *       created savepoint.
+ *   <li>Savepoint fails, job keeps running - The savepoint failed due to an error on the JM side,
+ *       before we ever triggered anything on the TM side. Fail the savepoint operation, but keep
+ *       the job running.
+ * </ul>
+ *
+ * <p>This is further complicated by this information being transmitted via 2 separate RPCs from
+ * TM->JM, with the {@code savepointFuture} not being completed in the main thread, introducing
+ * ordering/lateness issues. Be careful to not liberally use {@link Context#runIfState(State,
+ * Runnable, Duration)} because it can result in a message being lost if multiple operations are
+ * queued and the first initiates a state transition.
  */
 class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final Context context;
+    /**
+     * The result future of this operation, containing the path to the savepoint. This is the future
+     * that other components (e.g., the REST API) wait for.
+     *
+     * <p>Must only be completed successfully if the savepoint was created and the job has FINISHED.
+     */
     private final CompletableFuture<String> operationFuture;
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger state transitions,
+    // as several other methods do the same and we mustn't trigger multiple transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>();

Review Comment:
   > hmm if we'd move the expectFinished down a bit we'd have it covered...
   
   that sounds unrelated 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19968:
URL: https://github.com/apache/flink/pull/19968#issuecomment-1157437217

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901375211


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -337,6 +343,70 @@ void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
         }
     }
 
+    @Test
+    void testOnFailureWaitsForSavepointCompletion() throws Exception {
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            ctx.triggerExecutors();

Review Comment:
   yes I'll add a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901685682


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -49,19 +49,48 @@
  * of the operation) is made available via the "operationFuture" to the user. This operation is only
  * considered successfully if the "savepointFuture" completed successfully, and the job reached the
  * terminal state FINISHED.
+ *
+ * <p>This state has to cover several failure scenarios, depending on whether the savepoint
+ * succeeeds/fails and the job succeeds/fails/keeps running.
+ *
+ * <ul>
+ *   <li>Savepoint succeeds, job succeeds - The happy path we like to see.
+ *   <li>Savepoint fails, job fails - The generic failure case. Something happened during
+ *       checkpointing on the TM side that also failed the task; fail the savepoint operation and
+ *       restart the job.
+ *   <li>Savepoint succeeds, job fails - Some issue occurred in notifyCheckpointComplete or during
+ *       the job shutdown. Fail the savepoint operation and job, but inform the user about the
+ *       created savepoint.
+ *   <li>Savepoint fails, job keeps running - The savepoint failed due to an error on the JM side,
+ *       before we ever triggered anything on the TM side. Fail the savepoint operation, but keep
+ *       the job running.
+ * </ul>
+ *
+ * <p>This is further complicated by this information being transmitted via 2 separate RPCs from
+ * TM->JM, with the {@code savepointFuture} not being completed in the main thread, introducing
+ * ordering/lateness issues. Be careful to not liberally use {@link Context#runIfState(State,
+ * Runnable, Duration)} because it can result in a message being lost if multiple operations are
+ * queued and the first initiates a state transition.
  */
 class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final Context context;
+    /**
+     * The result future of this operation, containing the path to the savepoint. This is the future
+     * that other components (e.g., the REST API) wait for.
+     *
+     * <p>Must only be completed successfully if the savepoint was created and the job has FINISHED.
+     */
     private final CompletableFuture<String> operationFuture;
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger state transitions,
+    // as several other methods do the same and we mustn't trigger multiple transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>();

Review Comment:
   I'd like to avoid having to specify whether something is completed in the main thread or not; that should be the base assumption for any `CompletableFuture` _state_ we have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r900223182


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -132,6 +146,7 @@ public void onLeave(Class<? extends State> newState) {
 
     @Override
     public void cancel() {
+        operationFailureCause = new FlinkException("The job was cancelled.");

Review Comment:
   not waiting can cause bad things to happen when the SP completes during cancellation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901780482


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -49,19 +49,48 @@
  * of the operation) is made available via the "operationFuture" to the user. This operation is only
  * considered successfully if the "savepointFuture" completed successfully, and the job reached the
  * terminal state FINISHED.
+ *
+ * <p>This state has to cover several failure scenarios, depending on whether the savepoint
+ * succeeeds/fails and the job succeeds/fails/keeps running.
+ *
+ * <ul>
+ *   <li>Savepoint succeeds, job succeeds - The happy path we like to see.
+ *   <li>Savepoint fails, job fails - The generic failure case. Something happened during
+ *       checkpointing on the TM side that also failed the task; fail the savepoint operation and
+ *       restart the job.
+ *   <li>Savepoint succeeds, job fails - Some issue occurred in notifyCheckpointComplete or during
+ *       the job shutdown. Fail the savepoint operation and job, but inform the user about the
+ *       created savepoint.
+ *   <li>Savepoint fails, job keeps running - The savepoint failed due to an error on the JM side,
+ *       before we ever triggered anything on the TM side. Fail the savepoint operation, but keep
+ *       the job running.
+ * </ul>
+ *
+ * <p>This is further complicated by this information being transmitted via 2 separate RPCs from
+ * TM->JM, with the {@code savepointFuture} not being completed in the main thread, introducing
+ * ordering/lateness issues. Be careful to not liberally use {@link Context#runIfState(State,
+ * Runnable, Duration)} because it can result in a message being lost if multiple operations are
+ * queued and the first initiates a state transition.
  */
 class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final Context context;
+    /**
+     * The result future of this operation, containing the path to the savepoint. This is the future
+     * that other components (e.g., the REST API) wait for.
+     *
+     * <p>Must only be completed successfully if the savepoint was created and the job has FINISHED.
+     */
     private final CompletableFuture<String> operationFuture;
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger state transitions,
+    // as several other methods do the same and we mustn't trigger multiple transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>();

Review Comment:
   it is; a bit. I think we're better off merging them than covering 2 different aspects via subtle differences in the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r900209065


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -132,6 +146,7 @@ public void onLeave(Class<? extends State> newState) {
 
     @Override
     public void cancel() {
+        operationFailureCause = new FlinkException("The job was cancelled.");

Review Comment:
   document why we dont wait for the future here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901097551


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -57,11 +79,12 @@ class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger state transitions,
+    // as several other methods do the same and we mustn't trigger multiple transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>();

Review Comment:
   May we add some comment to the `operationsFuture` as well to distinguish the two from each other? The purpose of the `operationFuture` is, as far as I understand, to have a future that completes dependening on the internal state as soon as the subsequent state transition is happening.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -337,6 +343,70 @@ void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
         }
     }
 
+    @Test
+    void testOnFailureWaitsForSavepointCompletion() throws Exception {
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            ctx.triggerExecutors();

Review Comment:
   Should we add a comment here (and the other two test cases) indicating that we're not expecting anything to happen here because the `onFailure` call should have triggered a state change? ...to add a bit of context here. Because the test would succeed without this code line as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");

Review Comment:
   I guess, we should extend the test name here to something like `testJobFailedWithSavepointCreationSuccessful` to distinguish the test in a better way from the other test cases.
   
   Additionally, we should verify that a `StopWithSavepointStopException` is part of the cause for this specific test case, shouldn't we?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");

Review Comment:
   > needs another explicit test for job fails + successful savepoint
   
   is that the test you're refering to in your [comment above](https://github.com/apache/flink/pull/19968#pullrequestreview-1010663103)?
   
   `testJobFailed` pretty much covers the scenario, doesn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpinegizmo commented on pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
alpinegizmo commented on PR #19968:
URL: https://github.com/apache/flink/pull/19968#issuecomment-1157601440

   Overall this is challenging for me to evaluate, as I'm unfamiliar with this part of Flink.
   
   I agree that the new design is more straightforward, and the comments are quite helpful. 
   
   What I was looking for, but found it difficult to be sure about, is thorough test coverage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901688321


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -49,19 +49,48 @@
  * of the operation) is made available via the "operationFuture" to the user. This operation is only
  * considered successfully if the "savepointFuture" completed successfully, and the job reached the
  * terminal state FINISHED.
+ *
+ * <p>This state has to cover several failure scenarios, depending on whether the savepoint
+ * succeeeds/fails and the job succeeds/fails/keeps running.
+ *
+ * <ul>
+ *   <li>Savepoint succeeds, job succeeds - The happy path we like to see.
+ *   <li>Savepoint fails, job fails - The generic failure case. Something happened during
+ *       checkpointing on the TM side that also failed the task; fail the savepoint operation and
+ *       restart the job.
+ *   <li>Savepoint succeeds, job fails - Some issue occurred in notifyCheckpointComplete or during
+ *       the job shutdown. Fail the savepoint operation and job, but inform the user about the
+ *       created savepoint.
+ *   <li>Savepoint fails, job keeps running - The savepoint failed due to an error on the JM side,
+ *       before we ever triggered anything on the TM side. Fail the savepoint operation, but keep
+ *       the job running.
+ * </ul>
+ *
+ * <p>This is further complicated by this information being transmitted via 2 separate RPCs from
+ * TM->JM, with the {@code savepointFuture} not being completed in the main thread, introducing
+ * ordering/lateness issues. Be careful to not liberally use {@link Context#runIfState(State,
+ * Runnable, Duration)} because it can result in a message being lost if multiple operations are
+ * queued and the first initiates a state transition.
  */
 class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final Context context;
+    /**
+     * The result future of this operation, containing the path to the savepoint. This is the future
+     * that other components (e.g., the REST API) wait for.
+     *
+     * <p>Must only be completed successfully if the savepoint was created and the job has FINISHED.
+     */
     private final CompletableFuture<String> operationFuture;
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger state transitions,
+    // as several other methods do the same and we mustn't trigger multiple transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>();

Review Comment:
   hmm if we'd move the expectFinished down a bit we'd have it covered...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901681503


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -336,17 +342,83 @@ public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception
             executionGraph.registerExecution(execution);
             TaskExecutionStateTransition taskExecutionStateTransition =
                     ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
-            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition), is(true));
+            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
+        }
+    }
+
+    @Test
+    void testOnFailureWaitsForSavepointCompletion() throws Exception {
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            // this is a sanity check that we haven't scheduled a state transition
+            ctx.triggerExecutors();
+
+            ctx.setExpectRestarting(assertNonNull());
+            savepointFuture.complete(SAVEPOINT_PATH);
+            ctx.triggerExecutors();
+        }
+    }
+
+    @Test
+    void testOnGloballyTerminalStateWaitsForSavepointCompletion() throws Exception {

Review Comment:
   it's quite similar; testFinishedOnSuccessfulStopWithSavepoint is purely about the functional aspect, whereas this one is specifically about the waiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19968:
URL: https://github.com/apache/flink/pull/19968#issuecomment-1157673594

   The tests for the savepoint operations are scattered around quite a bit.
   We unfortunately can't fully cover it in the `StopWithSavepointTest` because that requires an actual execution graph. Creating that ourselves isn't really an option (because there are barely any contracts; everything just relies on existing behavior of the scheduler), and we also lack good test utils. Moving away from the ExecutionGraph, while technically possible, can't be done quickly because so many re-used components expect an execution graph.
   
   The waiting for the savepoint completion in `onFailure`/`onGloballyTerminalState` is covered by the newly added cases in `StopWithSavepointTest`.
   
   Not accidentally triggering 2 state transitions from the state is now enforced by 71f72cf57d820ed62560f07f62259408e3a18b52; this on it's own would've failed tests in `StopWithSavepointTest`, like `testJobFailedAndSavepointOperationFails`. we likely would've noticed the issue sooner if we had this earlier.
   
   As for other pre-existing tests:
   
   The `AdaptiveSchedulerTest` contains tests for the proper archiving of errors that occurred during `StopWithSavepoint`. These make sure we don't accidentally drop task failures.
   
   The `AdaptiveSchedulerITCase` contains high-level tests for the happy path and certain errors on the TM side. These make sure the savepoint operation does complete if a task failed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901377456


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");

Review Comment:
   > Additionally, we should verify that a StopWithSavepointStopException is part of the cause for this specific test case, shouldn't we?
   
   > is that the test you're refering to in your https://github.com/apache/flink/pull/19968#pullrequestreview-1010663103?
   
   yes that's what I meant. The enrichment isn't explicitly covered afaict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901587633


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -377,6 +378,7 @@ void testOnGloballyTerminalStateWaitsForSavepointCompletion() throws Exception {
             ctx.setStopWithSavepoint(sws);
 
             sws.onGloballyTerminalState(JobStatus.FINISHED);
+            // this is a sanity check that we haven't scheduled a state transition

Review Comment:
   there's another location in `testConcurrentSavepointFailureAndGloballyTerminalStateCauseRestart` (see [line 407](https://github.com/apache/flink/blob/106b83b88a1dab577b2f60b5a9dc8052526d3820/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java#L407)).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -49,19 +49,48 @@
  * of the operation) is made available via the "operationFuture" to the user. This operation is only
  * considered successfully if the "savepointFuture" completed successfully, and the job reached the
  * terminal state FINISHED.
+ *
+ * <p>This state has to cover several failure scenarios, depending on whether the savepoint
+ * succeeeds/fails and the job succeeds/fails/keeps running.
+ *
+ * <ul>
+ *   <li>Savepoint succeeds, job succeeds - The happy path we like to see.
+ *   <li>Savepoint fails, job fails - The generic failure case. Something happened during
+ *       checkpointing on the TM side that also failed the task; fail the savepoint operation and
+ *       restart the job.
+ *   <li>Savepoint succeeds, job fails - Some issue occurred in notifyCheckpointComplete or during
+ *       the job shutdown. Fail the savepoint operation and job, but inform the user about the
+ *       created savepoint.
+ *   <li>Savepoint fails, job keeps running - The savepoint failed due to an error on the JM side,
+ *       before we ever triggered anything on the TM side. Fail the savepoint operation, but keep
+ *       the job running.
+ * </ul>
+ *
+ * <p>This is further complicated by this information being transmitted via 2 separate RPCs from
+ * TM->JM, with the {@code savepointFuture} not being completed in the main thread, introducing
+ * ordering/lateness issues. Be careful to not liberally use {@link Context#runIfState(State,
+ * Runnable, Duration)} because it can result in a message being lost if multiple operations are
+ * queued and the first initiates a state transition.
  */
 class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final Context context;
+    /**
+     * The result future of this operation, containing the path to the savepoint. This is the future
+     * that other components (e.g., the REST API) wait for.
+     *
+     * <p>Must only be completed successfully if the savepoint was created and the job has FINISHED.
+     */
     private final CompletableFuture<String> operationFuture;
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger state transitions,
+    // as several other methods do the same and we mustn't trigger multiple transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>();

Review Comment:
   nit: I'm wondering whether we can make this future more descriptive by calling it something like `mainThreadBackedSavepointFuture` or `termainteInMainThreadSavepointFuture`... WDYT? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -336,17 +342,83 @@ public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception
             executionGraph.registerExecution(execution);
             TaskExecutionStateTransition taskExecutionStateTransition =
                     ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
-            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition), is(true));
+            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
+        }
+    }
+
+    @Test
+    void testOnFailureWaitsForSavepointCompletion() throws Exception {
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            // this is a sanity check that we haven't scheduled a state transition
+            ctx.triggerExecutors();
+
+            ctx.setExpectRestarting(assertNonNull());
+            savepointFuture.complete(SAVEPOINT_PATH);
+            ctx.triggerExecutors();
+        }
+    }
+
+    @Test
+    void testOnGloballyTerminalStateWaitsForSavepointCompletion() throws Exception {

Review Comment:
   Isn't that test testing the same as `testFinishedOnSuccessfulStopWithSavepoint` (first test in `StopWithSavepointTest`)?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -145,6 +145,16 @@ public void onLeave(Class<? extends State> newState) {
     }
 
     @Override

Review Comment:
   nit: shouldn't we put the JavaDoc on top of the `@Override` annotation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol merged PR #19968:
URL: https://github.com/apache/flink/pull/19968


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901485771


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");

Review Comment:
   > testJobFailed pretty much covers the scenario, doesn't it?
   
   It's close to covering it; just needs a 1 line adjustment :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19968:
URL: https://github.com/apache/flink/pull/19968#issuecomment-1156339819

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5efc07855f42f9f39d8b01c385b6ae58df06f233",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5efc07855f42f9f39d8b01c385b6ae58df06f233",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5efc07855f42f9f39d8b01c385b6ae58df06f233 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19968:
URL: https://github.com/apache/flink/pull/19968#issuecomment-1158571238

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org