You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/19 12:23:09 UTC

[GitHub] [flink] XComp commented on a diff in pull request #19968: [FLINK-27972][coordination] Wait until savepoint operation is complete

XComp commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901097551


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -57,11 +79,12 @@ class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger state transitions,
+    // as several other methods do the same and we mustn't trigger multiple transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>();

Review Comment:
   May we add some comment to the `operationsFuture` as well to distinguish the two from each other? The purpose of the `operationFuture` is, as far as I understand, to have a future that completes dependening on the internal state as soon as the subsequent state transition is happening.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -337,6 +343,70 @@ void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
         }
     }
 
+    @Test
+    void testOnFailureWaitsForSavepointCompletion() throws Exception {
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            ctx.triggerExecutors();

Review Comment:
   Should we add a comment here (and the other two test cases) indicating that we're not expecting anything to happen here because the `onFailure` call should have triggered a state change? ...to add a bit of context here. Because the test would succeed without this code line as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");

Review Comment:
   I guess, we should extend the test name here to something like `testJobFailedWithSavepointCreationSuccessful` to distinguish the test in a better way from the other test cases.
   
   Additionally, we should verify that a `StopWithSavepointStopException` is part of the cause for this specific test case, shouldn't we?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");

Review Comment:
   > needs another explicit test for job fails + successful savepoint
   
   is that the test you're refering to in your [comment above](https://github.com/apache/flink/pull/19968#pullrequestreview-1010663103)?
   
   `testJobFailed` pretty much covers the scenario, doesn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org