You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/06/22 06:55:25 UTC

[flink] branch master updated (f88d276bb42 -> 402ca9671e0)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from f88d276bb42 fixup! fixup! [FLINK-26762][docs] Document overdraft buffers
     new 006c4f07a60 [hotfix][tests] Cleanup StopWithSavepointTest
     new d13cb056912 [FLINK-27972][coordination] Wait until savepoint operation is complete
     new 402ca9671e0 [FLINK-27972][tests] Forbid multiple state transitions

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../scheduler/adaptive/StopWithSavepoint.java      | 179 +++++++++----
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  19 +-
 .../scheduler/adaptive/StopWithSavepointTest.java  | 280 +++++++++++++--------
 3 files changed, 329 insertions(+), 149 deletions(-)


[flink] 02/03: [FLINK-27972][coordination] Wait until savepoint operation is complete

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d13cb056912d9011df96671c3bd60299a59a1117
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 15 12:40:32 2022 +0200

    [FLINK-27972][coordination] Wait until savepoint operation is complete
---
 .../scheduler/adaptive/StopWithSavepoint.java      | 179 +++++++++++++++------
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  19 ++-
 .../scheduler/adaptive/StopWithSavepointTest.java  |  93 +++++++++--
 3 files changed, 231 insertions(+), 60 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
index de4584e0042..fbd9ffde3eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
@@ -49,19 +49,48 @@ import java.util.concurrent.ScheduledFuture;
  * of the operation) is made available via the "operationFuture" to the user. This operation is only
  * considered successfully if the "savepointFuture" completed successfully, and the job reached the
  * terminal state FINISHED.
+ *
+ * <p>This state has to cover several failure scenarios, depending on whether the savepoint
+ * succeeeds/fails and the job succeeds/fails/keeps running.
+ *
+ * <ul>
+ *   <li>Savepoint succeeds, job succeeds - The happy path we like to see.
+ *   <li>Savepoint fails, job fails - The generic failure case. Something happened during
+ *       checkpointing on the TM side that also failed the task; fail the savepoint operation and
+ *       restart the job.
+ *   <li>Savepoint succeeds, job fails - Some issue occurred in notifyCheckpointComplete or during
+ *       the job shutdown. Fail the savepoint operation and job, but inform the user about the
+ *       created savepoint.
+ *   <li>Savepoint fails, job keeps running - The savepoint failed due to an error on the JM side,
+ *       before we ever triggered anything on the TM side. Fail the savepoint operation, but keep
+ *       the job running.
+ * </ul>
+ *
+ * <p>This is further complicated by this information being transmitted via 2 separate RPCs from
+ * TM->JM, with the {@code savepointFuture} not being completed in the main thread, introducing
+ * ordering/lateness issues. Be careful to not liberally use {@link Context#runIfState(State,
+ * Runnable, Duration)} because it can result in a message being lost if multiple operations are
+ * queued and the first initiates a state transition.
  */
 class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final Context context;
+    /**
+     * The result future of this operation, containing the path to the savepoint. This is the future
+     * that other components (e.g., the REST API) wait for.
+     *
+     * <p>Must only be completed successfully if the savepoint was created and the job has FINISHED.
+     */
     private final CompletableFuture<String> operationFuture;
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger state transitions,
+    // as several other methods do the same and we mustn't trigger multiple transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>();
 
     StopWithSavepoint(
             Context context,
@@ -86,38 +115,29 @@ class StopWithSavepoint extends StateWithExecutionGraph {
         this.operationFuture = new CompletableFuture<>();
 
         FutureUtils.assertNoException(
-                savepointFuture.handle(
-                        (savepointLocation, throwable) -> {
-                            // make sure we handle the future completion in the main thread and
-                            // outside the constructor (where state transitions are not allowed)
-                            context.runIfState(
-                                    this,
-                                    () -> handleSavepointCompletion(savepointLocation, throwable),
-                                    Duration.ZERO);
+                internalSavepointFuture.exceptionally(
+                        cause -> {
+                            onSavepointFailure(cause);
                             return null;
                         }));
-    }
 
-    private void handleSavepointCompletion(
-            @Nullable String savepoint, @Nullable Throwable throwable) {
-        if (hasFullyFinished) {
-            Preconditions.checkState(
-                    throwable == null,
-                    "A savepoint should never fail after a job has been terminated via stop-with-savepoint.");
-            completeOperationAndGoToFinished(savepoint);
-        } else {
-            if (throwable != null) {
-                operationFailureCause = throwable;
-                checkpointScheduling.startCheckpointScheduler();
-                context.goToExecuting(
-                        getExecutionGraph(),
-                        getExecutionGraphHandler(),
-                        getOperatorCoordinatorHandler(),
-                        getFailures());
-            } else {
-                this.savepoint = savepoint;
-            }
-        }
+        // this is a roundabout way of splicing the completion of the future into the main thread.
+        // allows other methods to apply synchronous operations on the future without having to
+        // worry about the main thread.
+        savepointFuture.handle(
+                (savepoint, error) -> {
+                    context.runIfState(
+                            this,
+                            () -> {
+                                if (error != null) {
+                                    internalSavepointFuture.completeExceptionally(error);
+                                } else {
+                                    internalSavepointFuture.complete(savepoint);
+                                }
+                            },
+                            Duration.ZERO);
+                    return null;
+                });
     }
 
     @Override
@@ -130,8 +150,19 @@ class StopWithSavepoint extends StateWithExecutionGraph {
         super.onLeave(newState);
     }
 
+    /**
+     * Cancel the job and fail the savepoint operation future.
+     *
+     * <p>We don't wait for the {@link #internalSavepointFuture} here so that users can still cancel
+     * a job if the savepoint takes too long (or gets stuck).
+     *
+     * <p>Since we don't actually cancel the savepoint (for which there is no API to do so), there
+     * is a small risk that the job is cancelled at the very moment that the savepoint completes,
+     * causing it to not be reported to the user. See FLINK-28127.
+     */
     @Override
     public void cancel() {
+        operationFailureCause = new FlinkException("The job was cancelled.");
         context.goToCanceling(
                 getExecutionGraph(),
                 getExecutionGraphHandler(),
@@ -144,29 +175,81 @@ class StopWithSavepoint extends StateWithExecutionGraph {
         return JobStatus.RUNNING;
     }
 
+    /**
+     * Restarts the checkpoint scheduler and, if only the savepoint failed without a task failure /
+     * job termination, transitions back to {@link Executing}.
+     *
+     * <p>This method must assume that {@link #onFailure}/{@link #onGloballyTerminalState} MAY
+     * already be waiting for the savepoint operation to complete, itching to trigger a state
+     * transition (hence the {@link #hasPendingStateTransition} check).
+     *
+     * <p>If the above is violated (e.g., by always transitioning into another state), then
+     * depending on other implementation details something very bad will happen, like the scheduler
+     * crashing the JVM because it attempted multiple state transitions OR effectively dropping the
+     * onFailure/onGloballyTerminalState call OR we trigger state transitions while we are already
+     * in another state.
+     *
+     * <p>For maintainability reasons this method should not mutate any state that affects state
+     * transitions in other methods.
+     */
+    private void onSavepointFailure(Throwable cause) {
+        // revert side-effect of Executing#stopWithSavepoint
+        checkpointScheduling.startCheckpointScheduler();
+        // a task failed concurrently; defer the error handling to onFailure()
+        // otherwise we will attempt 2 state transitions, which is forbidden
+        if (!hasPendingStateTransition) {
+            operationFailureCause = cause;
+            context.goToExecuting(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    getFailures());
+        }
+    }
+
     @Override
     void onFailure(Throwable cause) {
-        operationFailureCause = cause;
-        if (savepoint == null) {
-            FailureResultUtil.restartOrFail(context.howToHandleFailure(cause), context, this);
-        } else {
-            // savepoint has been create successfully, but the job failed while committing side
-            // effects
-            final StopWithSavepointStoppingException ex =
-                    new StopWithSavepointStoppingException(savepoint, this.getJobId(), cause);
-            this.operationFuture.completeExceptionally(ex);
-            FailureResultUtil.restartOrFail(context.howToHandleFailure(ex), context, this);
+        if (hasPendingStateTransition) {
+            // the error handling remains the same independent of how many tasks have failed
+            // we don't want to initiate the same state transition multiple times, so we exit early
+            // this could also be achieved via Context#runIfState, but that'd spam the logs
+            return;
         }
+        hasPendingStateTransition = true;
+
+        FutureUtils.assertNoException(
+                internalSavepointFuture.handle(
+                        (savepoint, savepointError) -> {
+                            // if savepointError is null then the savepoint has been created
+                            // successfully, but the job failed while committing side effects,
+                            // so we enrich the exception for the user
+                            final Throwable ex =
+                                    savepointError != null
+                                            ? cause
+                                            : new StopWithSavepointStoppingException(
+                                                    savepoint, getJobId(), cause);
+                            operationFailureCause = ex;
+                            FailureResultUtil.restartOrFail(
+                                    context.howToHandleFailure(ex), context, this);
+                            return null;
+                        }));
     }
 
     @Override
     void onGloballyTerminalState(JobStatus globallyTerminalState) {
         if (globallyTerminalState == JobStatus.FINISHED) {
-            if (savepoint == null) {
-                hasFullyFinished = true;
-            } else {
-                completeOperationAndGoToFinished(savepoint);
-            }
+            // do not set this in other cases
+            // handleGlobalFailure circles back to onFailure()
+            hasPendingStateTransition = true;
+            FutureUtils.assertNoException(
+                    internalSavepointFuture.handle(
+                            (savepoint, error) -> {
+                                Preconditions.checkState(
+                                        error == null,
+                                        "A savepoint should never fail after a job has been terminated via stop-with-savepoint.");
+                                completeOperationAndGoToFinished(savepoint);
+                                return null;
+                            }));
         } else {
             handleGlobalFailure(
                     new FlinkException(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 28200416199..fee595c49d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -65,6 +66,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -1166,7 +1168,11 @@ public class AdaptiveSchedulerTest extends TestLogger {
                 jobGraph ->
                         jobGraph.setSnapshotSettings(
                                 new JobCheckpointingSettings(
-                                        CheckpointCoordinatorConfiguration.builder().build(),
+                                        // set a large checkpoint interval so we can easily deduce
+                                        // the savepoints checkpoint id
+                                        CheckpointCoordinatorConfiguration.builder()
+                                                .setCheckpointInterval(Long.MAX_VALUE)
+                                                .build(),
                                         null));
         final CompletedCheckpointStore completedCheckpointStore =
                 new StandaloneCompletedCheckpointStore(1);
@@ -1190,6 +1196,17 @@ public class AdaptiveSchedulerTest extends TestLogger {
                             new TaskExecutionStateTransition(
                                     new TaskExecutionState(
                                             attemptId, ExecutionState.FAILED, expectedException)));
+
+                    // fail the savepoint so that the job terminates
+                    for (ExecutionAttemptID id : attemptIds) {
+                        scheduler.declineCheckpoint(
+                                new DeclineCheckpoint(
+                                        scheduler.requestJob().getJobId(),
+                                        id,
+                                        checkpointIDCounter.get() - 1,
+                                        new CheckpointException(
+                                                CheckpointFailureReason.IO_EXCEPTION)));
+                    }
                 };
 
         final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index 4b2df124670..8738e7dfb47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLoggerExtension;
 
@@ -67,9 +68,12 @@ class StopWithSavepointTest {
             StopWithSavepoint sws =
                     createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
             ctx.setStopWithSavepoint(sws);
-            ctx.setExpectFinished(assertNonNull());
 
-            mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED);
+            sws.onGloballyTerminalState(JobStatus.FINISHED);
+            // this is a sanity check that we haven't scheduled a state transition
+            ctx.triggerExecutors();
+
+            ctx.setExpectFinished(assertNonNull());
             savepointFuture.complete(SAVEPOINT_PATH);
             ctx.triggerExecutors();
 
@@ -78,24 +82,32 @@ class StopWithSavepointTest {
     }
 
     @Test
-    void testJobFailed() throws Exception {
+    void testJobFailedAndSavepointOperationSucceeds() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
             ctx.setStopWithSavepoint(sws);
             ctx.setHowToHandleFailure(FailureResult::canNotRestart);
 
+            // fail job:
+            mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED);
+            // this is a sanity check that we haven't scheduled a state transition
+            ctx.triggerExecutors();
+
             ctx.setExpectFailing(
                     failingArguments -> {
                         assertThat(failingArguments.getExecutionGraph().getState())
                                 .isEqualTo(JobStatus.FAILED);
                         assertThat(failingArguments.getFailureCause())
-                                .satisfies(FlinkAssertions.anyCauseMatches(FlinkException.class));
+                                .satisfies(
+                                        FlinkAssertions.anyCauseMatches(
+                                                StopWithSavepointStoppingException.class));
                     });
 
-            // fail job:
-            mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED);
+            savepointFuture.complete(SAVEPOINT_PATH);
             ctx.triggerExecutors();
 
             assertThat(sws.getOperationFuture()).isCompletedExceptionally();
@@ -179,7 +191,8 @@ class StopWithSavepointTest {
     @Test
     void testRestartOnGlobalFailureIfRestartConfigured() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
-            StopWithSavepoint sws = createStopWithSavepoint(ctx);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");
+            StopWithSavepoint sws = createStopWithSavepoint(ctx, savepointFuture);
             ctx.setStopWithSavepoint(sws);
             ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
 
@@ -193,7 +206,8 @@ class StopWithSavepointTest {
     void testFailingOnGlobalFailureIfNoRestartConfigured() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
 
-            StopWithSavepoint sws = createStopWithSavepoint(ctx);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");
+            StopWithSavepoint sws = createStopWithSavepoint(ctx, savepointFuture);
             ctx.setStopWithSavepoint(sws);
             ctx.setHowToHandleFailure(FailureResult::canNotRestart);
 
@@ -211,7 +225,8 @@ class StopWithSavepointTest {
     void testFailingOnUpdateTaskExecutionStateWithNoRestart() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, executionGraph);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");
+            StopWithSavepoint sws = createStopWithSavepoint(ctx, executionGraph, savepointFuture);
             ctx.setStopWithSavepoint(sws);
             ctx.setHowToHandleFailure(FailureResult::canNotRestart);
 
@@ -238,7 +253,8 @@ class StopWithSavepointTest {
     void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, executionGraph);
+            final CompletableFuture<String> savepointFuture = CompletableFuture.completedFuture("");
+            StopWithSavepoint sws = createStopWithSavepoint(ctx, executionGraph, savepointFuture);
             ctx.setStopWithSavepoint(sws);
             ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
 
@@ -337,6 +353,52 @@ class StopWithSavepointTest {
         }
     }
 
+    @Test
+    void testOnFailureWaitsForSavepointCompletion() throws Exception {
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            // this is a sanity check that we haven't scheduled a state transition
+            ctx.triggerExecutors();
+
+            ctx.setExpectRestarting(assertNonNull());
+            savepointFuture.complete(SAVEPOINT_PATH);
+            ctx.triggerExecutors();
+        }
+    }
+
+    @Test
+    void testConcurrentSavepointFailureAndGloballyTerminalStateCauseRestart() throws Exception {
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            // this is a sanity check that we haven't scheduled a state transition
+            ctx.triggerExecutors();
+
+            ctx.setExpectRestarting(assertNonNull());
+            savepointFuture.completeExceptionally(new Exception("savepoint failure"));
+            ctx.triggerExecutors();
+        }
+    }
+
     @Test
     void testEnsureCheckpointSchedulerIsStartedAgain() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
@@ -366,6 +428,15 @@ class StopWithSavepointTest {
                 new CompletableFuture<>());
     }
 
+    private static StopWithSavepoint createStopWithSavepoint(
+            MockStopWithSavepointContext ctx, CompletableFuture<String> savepointFuture) {
+        return createStopWithSavepoint(
+                ctx,
+                new MockCheckpointScheduling(),
+                new StateTrackingMockExecutionGraph(),
+                savepointFuture);
+    }
+
     private static StopWithSavepoint createStopWithSavepoint(
             MockStopWithSavepointContext ctx,
             ExecutionGraph executionGraph,


[flink] 03/03: [FLINK-27972][tests] Forbid multiple state transitions

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 402ca9671e028ea008ffa6b332925116d4f317fe
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 15 11:30:20 2022 +0200

    [FLINK-27972][tests] Forbid multiple state transitions
---
 .../runtime/scheduler/adaptive/StopWithSavepointTest.java    | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index 8738e7dfb47..1ee3f5e5194 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -545,6 +545,9 @@ class StopWithSavepointTest {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 List<ExceptionHistoryEntry> failureCollection) {
+            if (hadStateTransition) {
+                throw new IllegalStateException("Only one state transition is allowed.");
+            }
             simulateTransitionToState(Canceling.class);
 
             cancellingStateValidator.validateInput(
@@ -560,6 +563,9 @@ class StopWithSavepointTest {
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 Duration backoffTime,
                 List<ExceptionHistoryEntry> failureCollection) {
+            if (hadStateTransition) {
+                throw new IllegalStateException("Only one state transition is allowed.");
+            }
             simulateTransitionToState(Restarting.class);
             restartingStateValidator.validateInput(
                     new ExecutingTest.RestartingArguments(
@@ -577,6 +583,9 @@ class StopWithSavepointTest {
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 Throwable failureCause,
                 List<ExceptionHistoryEntry> failureCollection) {
+            if (hadStateTransition) {
+                throw new IllegalStateException("Only one state transition is allowed.");
+            }
             simulateTransitionToState(Failing.class);
             failingStateValidator.validateInput(
                     new ExecutingTest.FailingArguments(
@@ -593,6 +602,9 @@ class StopWithSavepointTest {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 List<ExceptionHistoryEntry> failureCollection) {
+            if (hadStateTransition) {
+                throw new IllegalStateException("Only one state transition is allowed.");
+            }
             simulateTransitionToState(Executing.class);
             executingStateTransition.validateInput(
                     new ExecutingTest.CancellingArguments(


[flink] 01/03: [hotfix][tests] Cleanup StopWithSavepointTest

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 006c4f07a60e1a4aa9caa3e9b1a0db77e695d954
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 15 10:46:57 2022 +0200

    [hotfix][tests] Cleanup StopWithSavepointTest
---
 .../scheduler/adaptive/StopWithSavepointTest.java  | 179 ++++++++++-----------
 1 file changed, 88 insertions(+), 91 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index 8c7e3ab9a0c..4b2df124670 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
@@ -29,9 +30,12 @@ import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -42,18 +46,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** Tests for the {@link StopWithSavepoint} state. */
-public class StopWithSavepointTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class StopWithSavepointTest {
+    private static final Logger LOG = LoggerFactory.getLogger(StopWithSavepointTest.class);
+
     private static final String SAVEPOINT_PATH = "test://savepoint/path";
 
     @Test
-    public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception {
+    void testFinishedOnSuccessfulStopWithSavepoint() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
@@ -68,12 +73,12 @@ public class StopWithSavepointTest extends TestLogger {
             savepointFuture.complete(SAVEPOINT_PATH);
             ctx.triggerExecutors();
 
-            assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH));
+            assertThat(sws.getOperationFuture().get()).isEqualTo(SAVEPOINT_PATH);
         }
     }
 
     @Test
-    public void testJobFailed() throws Exception {
+    void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
@@ -83,24 +88,22 @@ public class StopWithSavepointTest extends TestLogger {
 
             ctx.setExpectFailing(
                     failingArguments -> {
-                        assertThat(
-                                failingArguments.getExecutionGraph().getState(),
-                                is(JobStatus.FAILED));
-                        assertThat(
-                                failingArguments.getFailureCause(),
-                                containsCause(FlinkException.class));
+                        assertThat(failingArguments.getExecutionGraph().getState())
+                                .isEqualTo(JobStatus.FAILED);
+                        assertThat(failingArguments.getFailureCause())
+                                .satisfies(FlinkAssertions.anyCauseMatches(FlinkException.class));
                     });
 
             // fail job:
             mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED);
             ctx.triggerExecutors();
 
-            assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true));
+            assertThat(sws.getOperationFuture()).isCompletedExceptionally();
         }
     }
 
     @Test
-    public void testJobFailedAndSavepointOperationFails() throws Exception {
+    void testJobFailedAndSavepointOperationFails() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
@@ -112,12 +115,10 @@ public class StopWithSavepointTest extends TestLogger {
 
             ctx.setExpectFailing(
                     failingArguments -> {
-                        assertThat(
-                                failingArguments.getExecutionGraph().getState(),
-                                is(JobStatus.FAILED));
-                        assertThat(
-                                failingArguments.getFailureCause(),
-                                containsCause(FlinkException.class));
+                        assertThat(failingArguments.getExecutionGraph().getState())
+                                .isEqualTo(JobStatus.FAILED);
+                        assertThat(failingArguments.getFailureCause())
+                                .satisfies(FlinkAssertions.anyCauseMatches(FlinkException.class));
                     });
 
             // fail job:
@@ -125,12 +126,12 @@ public class StopWithSavepointTest extends TestLogger {
             savepointFuture.completeExceptionally(new RuntimeException());
             ctx.triggerExecutors();
 
-            assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true));
+            assertThat(sws.getOperationFuture()).isCompletedExceptionally();
         }
     }
 
     @Test
-    public void testJobFinishedBeforeSavepointFuture() throws Exception {
+    void testJobFinishedBeforeSavepointFuture() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
@@ -146,12 +147,12 @@ public class StopWithSavepointTest extends TestLogger {
             savepointFuture.complete(SAVEPOINT_PATH);
             ctx.triggerExecutors();
 
-            assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH));
+            assertThat(sws.getOperationFuture().get()).isEqualTo(SAVEPOINT_PATH);
         }
     }
 
     @Test
-    public void testTransitionToCancellingOnCancel() throws Exception {
+    void testTransitionToCancellingOnCancel() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StopWithSavepoint sws = createStopWithSavepoint(ctx);
             ctx.setStopWithSavepoint(sws);
@@ -162,12 +163,13 @@ public class StopWithSavepointTest extends TestLogger {
     }
 
     @Test
-    public void testTransitionToFinishedOnSuspend() throws Exception {
+    void testTransitionToFinishedOnSuspend() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StopWithSavepoint sws = createStopWithSavepoint(ctx);
             ctx.setExpectFinished(
                     archivedExecutionGraph -> {
-                        assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED));
+                        assertThat(archivedExecutionGraph.getState())
+                                .isEqualTo(JobStatus.SUSPENDED);
                     });
 
             sws.suspend(new RuntimeException());
@@ -175,7 +177,7 @@ public class StopWithSavepointTest extends TestLogger {
     }
 
     @Test
-    public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception {
+    void testRestartOnGlobalFailureIfRestartConfigured() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StopWithSavepoint sws = createStopWithSavepoint(ctx);
             ctx.setStopWithSavepoint(sws);
@@ -188,7 +190,7 @@ public class StopWithSavepointTest extends TestLogger {
     }
 
     @Test
-    public void testFailingOnGlobalFailureIfNoRestartConfigured() throws Exception {
+    void testFailingOnGlobalFailureIfNoRestartConfigured() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
 
             StopWithSavepoint sws = createStopWithSavepoint(ctx);
@@ -197,9 +199,8 @@ public class StopWithSavepointTest extends TestLogger {
 
             ctx.setExpectFailing(
                     failingArguments -> {
-                        assertThat(
-                                failingArguments.getFailureCause(),
-                                containsCause(RuntimeException.class));
+                        assertThat(failingArguments.getFailureCause())
+                                .satisfies(FlinkAssertions.anyCauseMatches(RuntimeException.class));
                     });
 
             sws.handleGlobalFailure(new RuntimeException());
@@ -207,7 +208,7 @@ public class StopWithSavepointTest extends TestLogger {
     }
 
     @Test
-    public void testFailingOnUpdateTaskExecutionStateWithNoRestart() throws Exception {
+    void testFailingOnUpdateTaskExecutionStateWithNoRestart() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
             StopWithSavepoint sws = createStopWithSavepoint(ctx, executionGraph);
@@ -216,9 +217,8 @@ public class StopWithSavepointTest extends TestLogger {
 
             ctx.setExpectFailing(
                     failingArguments -> {
-                        assertThat(
-                                failingArguments.getFailureCause(),
-                                containsCause(RuntimeException.class));
+                        assertThat(failingArguments.getFailureCause())
+                                .satisfies(FlinkAssertions.anyCauseMatches(RuntimeException.class));
                     });
 
             Exception exception = new RuntimeException();
@@ -230,12 +230,12 @@ public class StopWithSavepointTest extends TestLogger {
             executionGraph.registerExecution(execution);
             TaskExecutionStateTransition taskExecutionStateTransition =
                     ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
-            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition), is(true));
+            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
         }
     }
 
     @Test
-    public void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Exception {
+    void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
             StopWithSavepoint sws = createStopWithSavepoint(ctx, executionGraph);
@@ -253,62 +253,59 @@ public class StopWithSavepointTest extends TestLogger {
             executionGraph.registerExecution(execution);
             TaskExecutionStateTransition taskExecutionStateTransition =
                     ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
-            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition), is(true));
+            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
         }
     }
 
     @Test
-    public void testExceptionalOperationFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion()
+    void testExceptionalOperationFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion()
             throws Exception {
-        MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();
-        StopWithSavepoint sws = createStopWithSavepoint(ctx);
-        ctx.setStopWithSavepoint(sws);
-
-        sws.onLeave(Canceling.class);
+        final StopWithSavepoint sws;
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            sws = createStopWithSavepoint(ctx);
+            ctx.setStopWithSavepoint(sws);
 
-        ctx.close();
-        assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true));
+            sws.onLeave(Canceling.class);
+        }
+        assertThat(sws.getOperationFuture()).isCompletedExceptionally();
     }
 
     @Test
-    public void testExceptionalSavepointCompletionLeadsToExceptionalOperationFutureCompletion()
+    void testExceptionalSavepointCompletionLeadsToExceptionalOperationFutureCompletion()
             throws Exception {
-        MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();
-        CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
-        CompletableFuture<String> savepointFuture = new CompletableFuture<>();
-        StopWithSavepoint sws =
-                createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
-        ctx.setStopWithSavepoint(sws);
-        ctx.setExpectExecuting(assertNonNull());
-
-        savepointFuture.completeExceptionally(new RuntimeException("Test error"));
-
-        ctx.close();
-        assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true));
+        final StopWithSavepoint sws;
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            sws = createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+            ctx.setExpectExecuting(assertNonNull());
+
+            savepointFuture.completeExceptionally(new RuntimeException("Test error"));
+        }
+        assertThat(sws.getOperationFuture()).isCompletedExceptionally();
     }
 
     @Test
-    public void testErrorCreatingSavepointLeadsToTransitionToExecutingState() throws Exception {
-        MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();
-        CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
-        CompletableFuture<String> savepointFuture = new CompletableFuture<>();
-        StopWithSavepoint sws =
-                createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
-        ctx.setStopWithSavepoint(sws);
-        ctx.setExpectExecuting(
-                executingArguments ->
-                        assertThat(
-                                executingArguments.getExecutionGraph().getState(),
-                                is(JobStatus.RUNNING)));
-
-        savepointFuture.completeExceptionally(new RuntimeException("Test error"));
-
-        ctx.close();
-        assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true));
+    void testErrorCreatingSavepointLeadsToTransitionToExecutingState() throws Exception {
+        final StopWithSavepoint sws;
+        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+            sws = createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+            ctx.setExpectExecuting(
+                    executingArguments ->
+                            assertThat(executingArguments.getExecutionGraph().getState())
+                                    .isEqualTo(JobStatus.RUNNING));
+
+            savepointFuture.completeExceptionally(new RuntimeException("Test error"));
+        }
+        assertThat(sws.getOperationFuture()).isCompletedExceptionally();
     }
 
     @Test
-    public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
+    void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
             CompletableFuture<String> savepointFuture = new CompletableFuture<>();
@@ -336,17 +333,17 @@ public class StopWithSavepointTest extends TestLogger {
             executionGraph.registerExecution(execution);
             TaskExecutionStateTransition taskExecutionStateTransition =
                     ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
-            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition), is(true));
+            assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
         }
     }
 
     @Test
-    public void testEnsureCheckpointSchedulerIsStartedAgain() throws Exception {
+    void testEnsureCheckpointSchedulerIsStartedAgain() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             MockCheckpointScheduling mockStopWithSavepointOperations =
                     new MockCheckpointScheduling();
 
-            assertThat(mockStopWithSavepointOperations.isCheckpointSchedulerStarted(), is(false));
+            assertThat(mockStopWithSavepointOperations.isCheckpointSchedulerStarted()).isFalse();
 
             CompletableFuture<String> savepointFuture = new CompletableFuture<>();
             StopWithSavepoint sws =
@@ -357,11 +354,11 @@ public class StopWithSavepointTest extends TestLogger {
             // a failure should start the scheduler again
             savepointFuture.completeExceptionally(new RuntimeException("Test error"));
             ctx.triggerExecutors();
-            assertThat(mockStopWithSavepointOperations.isCheckpointSchedulerStarted(), is(true));
+            assertThat(mockStopWithSavepointOperations.isCheckpointSchedulerStarted()).isTrue();
         }
     }
 
-    private StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx) {
+    private static StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx) {
         return createStopWithSavepoint(
                 ctx,
                 new MockCheckpointScheduling(),
@@ -369,7 +366,7 @@ public class StopWithSavepointTest extends TestLogger {
                 new CompletableFuture<>());
     }
 
-    private StopWithSavepoint createStopWithSavepoint(
+    private static StopWithSavepoint createStopWithSavepoint(
             MockStopWithSavepointContext ctx,
             ExecutionGraph executionGraph,
             CompletableFuture<String> savepointFuture) {
@@ -377,12 +374,12 @@ public class StopWithSavepointTest extends TestLogger {
                 ctx, new MockCheckpointScheduling(), executionGraph, savepointFuture);
     }
 
-    private StopWithSavepoint createStopWithSavepoint(
+    private static StopWithSavepoint createStopWithSavepoint(
             MockStopWithSavepointContext ctx, ExecutionGraph executionGraph) {
         return createStopWithSavepoint(ctx, executionGraph, new CompletableFuture<>());
     }
 
-    private StopWithSavepoint createStopWithSavepoint(
+    private static StopWithSavepoint createStopWithSavepoint(
             MockStopWithSavepointContext ctx,
             CheckpointScheduling checkpointScheduling,
             CompletableFuture<String> savepointFuture) {
@@ -390,7 +387,7 @@ public class StopWithSavepointTest extends TestLogger {
                 ctx, checkpointScheduling, new StateTrackingMockExecutionGraph(), savepointFuture);
     }
 
-    private StopWithSavepoint createStopWithSavepoint(
+    private static StopWithSavepoint createStopWithSavepoint(
             MockStopWithSavepointContext ctx,
             CheckpointScheduling checkpointScheduling,
             ExecutionGraph executionGraph,
@@ -398,7 +395,7 @@ public class StopWithSavepointTest extends TestLogger {
         final ExecutionGraphHandler executionGraphHandler =
                 new ExecutionGraphHandler(
                         executionGraph,
-                        log,
+                        LOG,
                         ctx.getMainThreadExecutor(),
                         ctx.getMainThreadExecutor());
         OperatorCoordinatorHandler operatorCoordinatorHandler =
@@ -412,7 +409,7 @@ public class StopWithSavepointTest extends TestLogger {
                 executionGraphHandler,
                 operatorCoordinatorHandler,
                 checkpointScheduling,
-                log,
+                LOG,
                 ClassLoader.getSystemClassLoader(),
                 savepointFuture,
                 new ArrayList<>());