You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/26 13:31:12 UTC

[flink] branch master updated (ebde3ab -> a2c7378)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ebde3ab  [hotfix][docs] Fix computed columns documentation
     new 8b3d620  [hotfix][tests] Adds log message to MiniClusterWithClientResource shutdown
     new 25ef419  [hotfix][task] Interrupt source legacy thread on failure.
     new 1e99f0a  [hotfix][test] Adds unit test for local and global failure happened concurrently
     new f384b32  [FLINK-21030][runtime] Adds trigger for global failover
     new a2c7378  [hotfix][test] Removes unused jobId parameter

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...anCalculator.java => CheckpointScheduling.java} |  19 +-
 .../flink/runtime/scheduler/SchedulerBase.java     | 103 ++++----
 .../StopWithSavepointTerminationHandler.java       |  71 +++++
 .../StopWithSavepointTerminationHandlerImpl.java   | 290 +++++++++++++++++++++
 .../StopWithSavepointTerminationManager.java       |  82 ++++++
 .../checkpoint/TestingCheckpointScheduling.java}   |  31 ++-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 132 ++++++++--
 .../runtime/scheduler/TestingSchedulerNG.java      |  19 +-
 ...topWithSavepointTerminationHandlerImplTest.java | 227 ++++++++++++++++
 .../StopWithSavepointTerminationManagerTest.java   | 123 +++++++++
 .../streaming/runtime/tasks/SourceStreamTask.java  |  26 +-
 .../test/util/MiniClusterWithClientResource.java   |   1 +
 .../flink/test/checkpointing/SavepointITCase.java  | 210 +++++++++++++++
 13 files changed, 1225 insertions(+), 109 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/{CheckpointPlanCalculator.java => CheckpointScheduling.java} (69%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandler.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManager.java
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java => test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointScheduling.java} (53%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManagerTest.java


[flink] 04/05: [FLINK-21030][runtime] Adds trigger for global failover

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f384b32877979d1118c0478bacae622ac7e0b330
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Feb 9 13:41:53 2021 +0100

    [FLINK-21030][runtime] Adds trigger for global failover
    
    We cannot assume that the termination future is finishing when triggering a
    synchronous savepoint. It might be that the savepoint creation succeeds but
    the graceful shutdown of the job fails. In this case, the job termination does
    not complete. The job might end up in an inconsistent state. That's why, a
    global fail over is triggered if the scheduler observes one of the current
    executions ending up in a non-finished state.
    
    Unit tests and integration tests are added to cover this use-case.
    
    This closes #14847.
---
 .../runtime/checkpoint/CheckpointScheduling.java   |  32 +++
 .../flink/runtime/scheduler/SchedulerBase.java     | 103 ++++----
 .../StopWithSavepointTerminationHandler.java       |  71 +++++
 .../StopWithSavepointTerminationHandlerImpl.java   | 290 +++++++++++++++++++++
 .../StopWithSavepointTerminationManager.java       |  82 ++++++
 .../checkpoint/TestingCheckpointScheduling.java    |  48 ++++
 .../runtime/scheduler/DefaultSchedulerTest.java    |  33 +++
 .../runtime/scheduler/TestingSchedulerNG.java      |  19 +-
 ...topWithSavepointTerminationHandlerImplTest.java | 227 ++++++++++++++++
 .../StopWithSavepointTerminationManagerTest.java   | 123 +++++++++
 .../flink/test/checkpointing/SavepointITCase.java  | 210 +++++++++++++++
 11 files changed, 1186 insertions(+), 52 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointScheduling.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointScheduling.java
new file mode 100644
index 0000000..86c19d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointScheduling.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * {@code CheckpointScheduling} provides methods for starting and stopping the periodic scheduling
+ * of checkpoints.
+ */
+public interface CheckpointScheduling {
+
+    /** Starts the periodic scheduling if possible. */
+    void startCheckpointScheduler();
+
+    /** Stops the periodic scheduling if possible. */
+    void stopCheckpointScheduler();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 41988ad..07765c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -81,6 +82,8 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
@@ -112,12 +115,13 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** Base class which can be used to implement {@link SchedulerNG}. */
-public abstract class SchedulerBase implements SchedulerNG {
+public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling {
 
     private final Logger log;
 
@@ -858,7 +862,7 @@ public abstract class SchedulerBase implements SchedulerNG {
                 jobGraph.getJobID());
 
         if (cancelJob) {
-            checkpointCoordinator.stopCheckpointScheduler();
+            stopCheckpointScheduler();
         }
 
         return checkpointCoordinator
@@ -868,7 +872,7 @@ public abstract class SchedulerBase implements SchedulerNG {
                         (path, throwable) -> {
                             if (throwable != null) {
                                 if (cancelJob) {
-                                    startCheckpointScheduler(checkpointCoordinator);
+                                    startCheckpointScheduler();
                                 }
                                 throw new CompletionException(throwable);
                             } else if (cancelJob) {
@@ -883,10 +887,26 @@ public abstract class SchedulerBase implements SchedulerNG {
                         mainThreadExecutor);
     }
 
-    private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
+    @Override
+    public void stopCheckpointScheduler() {
+        final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
+        if (checkpointCoordinator == null) {
+            log.info(
+                    "Periodic checkpoint scheduling could not be stopped due to the CheckpointCoordinator being shutdown.");
+        } else {
+            checkpointCoordinator.stopCheckpointScheduler();
+        }
+    }
+
+    @Override
+    public void startCheckpointScheduler() {
         mainThreadExecutor.assertRunningInMainThread();
+        final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
 
-        if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
+        if (checkpointCoordinator == null) {
+            log.info(
+                    "Periodic checkpoint scheduling could not be started due to the CheckpointCoordinator being shutdown.");
+        } else if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
             try {
                 checkpointCoordinator.startCheckpointScheduler();
             } catch (IllegalStateException ignored) {
@@ -954,51 +974,36 @@ public abstract class SchedulerBase implements SchedulerNG {
         // to have only the data of the synchronous savepoint committed.
         // in case of failure, and if the job restarts, the coordinator
         // will be restarted by the CheckpointCoordinatorDeActivator.
-        checkpointCoordinator.stopCheckpointScheduler();
-
-        final CompletableFuture<String> savepointFuture =
-                checkpointCoordinator
-                        .triggerSynchronousSavepoint(terminate, targetDirectory)
-                        .thenApply(CompletedCheckpoint::getExternalPointer);
-
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()
-                        .handle(
-                                (jobstatus, throwable) -> {
-                                    if (throwable != null) {
-                                        log.info(
-                                                "Failed during stopping job {} with a savepoint. Reason: {}",
-                                                jobGraph.getJobID(),
-                                                throwable.getMessage());
-                                        throw new CompletionException(throwable);
-                                    } else if (jobstatus != JobStatus.FINISHED) {
-                                        log.info(
-                                                "Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.",
-                                                jobGraph.getJobID(),
-                                                jobstatus);
-                                        throw new CompletionException(
-                                                new FlinkException(
-                                                        "Reached state "
-                                                                + jobstatus
-                                                                + " instead of FINISHED."));
-                                    }
-                                    return jobstatus;
-                                });
-
-        return savepointFuture
-                .thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path)))
-                .handleAsync(
-                        (path, throwable) -> {
-                            if (throwable != null) {
-                                // restart the checkpoint coordinator if stopWithSavepoint failed.
-                                startCheckpointScheduler(checkpointCoordinator);
-                                throw new CompletionException(throwable);
-                            }
+        stopCheckpointScheduler();
 
-                            return path;
-                        },
-                        mainThreadExecutor);
+        final CompletableFuture<Collection<ExecutionState>> executionTerminationsFuture =
+                getCombinedExecutionTerminationFuture();
+
+        final CompletableFuture<CompletedCheckpoint> savepointFuture =
+                checkpointCoordinator.triggerSynchronousSavepoint(terminate, targetDirectory);
+
+        final StopWithSavepointTerminationManager stopWithSavepointTerminationManager =
+                new StopWithSavepointTerminationManager(
+                        new StopWithSavepointTerminationHandlerImpl(
+                                jobGraph.getJobID(), this, log));
+
+        return stopWithSavepointTerminationManager.stopWithSavepoint(
+                savepointFuture, executionTerminationsFuture, mainThreadExecutor);
+    }
+
+    /**
+     * Returns a {@code CompletableFuture} collecting the termination states of all {@link Execution
+     * Executions} of the underlying {@link ExecutionGraph}.
+     *
+     * @return a {@code CompletableFuture} that completes after all underlying {@code Executions}
+     *     have been terminated.
+     */
+    private CompletableFuture<Collection<ExecutionState>> getCombinedExecutionTerminationFuture() {
+        return FutureUtils.combineAll(
+                StreamSupport.stream(executionGraph.getAllExecutionVertices().spliterator(), false)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .map(Execution::getTerminalStateFuture)
+                        .collect(Collectors.toList()));
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandler.java
new file mode 100644
index 0000000..659819c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code StopWithSavepointTerminationHandler} handles the termination steps necessary for the
+ * stop-with-savepoint operation to finish. The order of the terminations matter:
+ *
+ * <ol>
+ *   <li>Creating a savepoint needs to be completed
+ *   <li>Waiting for the executions of the underlying job to finish
+ * </ol>
+ */
+public interface StopWithSavepointTerminationHandler {
+
+    /**
+     * Returns the a {@code CompletableFuture} referring to the result of the stop-with-savepoint
+     * operation.
+     *
+     * @return the {@code CompletableFuture} containing the path to the created savepoint in case of
+     *     success.
+     */
+    CompletableFuture<String> getSavepointPath();
+
+    /**
+     * Handles the result of a {@code CompletableFuture} holding a {@link CompletedCheckpoint}. Only
+     * one of the two parameters are allowed to be set.
+     *
+     * @param completedSavepoint the {@code CompletedCheckpoint} referring to the created savepoint
+     * @param throwable an error that was caught during savepoint creation
+     * @throws IllegalArgumentException if {@code throwable} and {@code completedSavepoint} are set
+     * @throws NullPointerException if none of the parameters is set
+     */
+    void handleSavepointCreation(
+            @Nullable CompletedCheckpoint completedSavepoint, @Nullable Throwable throwable);
+
+    /**
+     * Handles the termination of the job based on the passed terminated {@link ExecutionState
+     * ExecutionStates}. stop-with-savepoint expects the {@code terminatedExecutionStates} to only
+     * contain {@link ExecutionState#FINISHED} to succeed.
+     *
+     * @param terminatedExecutionStates The terminated {@code ExecutionStates} of the underlying
+     *     job.
+     * @throws NullPointerException if {@code null} is passed.
+     */
+    void handleExecutionsTermination(Collection<ExecutionState> terminatedExecutionStates);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
new file mode 100644
index 0000000..aec32d3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and the successful
+ * termination of the job, succeed. If the former step fails, the operation fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is triggered before
+ * failing the operation.
+ *
+ * <p>The implementation expects the savepoint creation being completed before the executions
+ * terminate.
+ *
+ * @see StopWithSavepointTerminationManager
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl(
+            JobID jobId, S schedulerWithCheckpointing, Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            JobID jobId,
+            SchedulerNG scheduler,
+            CheckpointScheduling checkpointScheduling,
+            Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> getSavepointPath() {
+        return result;
+    }
+
+    @Override
+    public void handleSavepointCreation(
+            CompletedCheckpoint completedSavepoint, Throwable throwable) {
+        if (throwable != null) {
+            checkArgument(
+                    completedSavepoint == null,
+                    "No savepoint should be provided if a throwable is passed.");
+            handleSavepointCreationFailure(throwable);
+        } else {
+            handleSavepointCreationSuccess(checkNotNull(completedSavepoint));
+        }
+    }
+
+    @Override
+    public void handleExecutionsTermination(Collection<ExecutionState> terminatedExecutionStates) {
+        final Set<ExecutionState> notFinishedExecutionStates =
+                checkNotNull(terminatedExecutionStates).stream()
+                        .filter(state -> state != ExecutionState.FINISHED)
+                        .collect(Collectors.toSet());
+
+        if (notFinishedExecutionStates.isEmpty()) {
+            handleExecutionsFinished();
+        } else {
+            handleAnyExecutionNotFinished(notFinishedExecutionStates);
+        }
+    }
+
+    private void handleSavepointCreationSuccess(CompletedCheckpoint completedCheckpoint) {
+        final State oldState = state;
+        state = state.onSavepointCreation(completedCheckpoint);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint creation handling for job {}.",
+                oldState.getName(),
+                state.getName(),
+                jobId);
+    }
+
+    private void handleSavepointCreationFailure(Throwable throwable) {
+        final State oldState = state;
+        state = state.onSavepointCreationFailure(throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint creation failure handling for job {}.",
+                oldState.getName(),
+                state.getName(),
+                jobId);
+    }
+
+    private void handleExecutionsFinished() {
+        final State oldState = state;
+        state = state.onExecutionsFinished();
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution termination handling with all executions being finished for job {}.",
+                oldState.getName(),
+                state.getName(),
+                jobId);
+    }
+
+    private void handleAnyExecutionNotFinished(Set<ExecutionState> notFinishedExecutionStates) {
+        final State oldState = state;
+        state = state.onAnyExecutionNotFinished(notFinishedExecutionStates);
+
+        log.warn(
+                "Stop-with-savepoint transitioned from {} to {} on execution termination handling for job {} with some executions being in an not-finished state: {}",
+                oldState.getName(),
+                state.getName(),
+                jobId,
+                notFinishedExecutionStates);
+    }
+
+    /**
+     * Handles the termination of the {@code StopWithSavepointTerminationHandler} exceptionally
+     * after triggering a global job fail-over.
+     *
+     * @param unfinishedExecutionStates the unfinished states that caused the failure.
+     * @param savepointPath the path to the successfully created savepoint.
+     */
+    private void terminateExceptionallyWithGlobalFailover(
+            Iterable<ExecutionState> unfinishedExecutionStates, String savepointPath) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), jobId);
+        FlinkException inconsistentFinalStateException = new FlinkException(errorMessage);
+
+        log.warn(
+                "A savepoint was created at {} but the corresponding job {} didn't terminate successfully.",
+                savepointPath,
+                jobId,
+                inconsistentFinalStateException);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+
+        result.completeExceptionally(inconsistentFinalStateException);
+    }
+
+    /**
+     * Handles the termination of the {@code StopWithSavepointTerminationHandler} exceptionally
+     * without triggering a global job fail-over but restarting the checkpointing. It does restart
+     * the checkpoint scheduling.
+     *
+     * @param throwable the error that caused the exceptional termination.
+     */
+    private void terminateExceptionally(Throwable throwable) {
+        checkpointScheduling.startCheckpointScheduler();
+        result.completeExceptionally(throwable);
+    }
+
+    /**
+     * Handles the successful termination of the {@code StopWithSavepointTerminationHandler}.
+     *
+     * @param completedSavepoint the completed savepoint
+     */
+    private void terminateSuccessfully(CompletedCheckpoint completedSavepoint) {
+        result.complete(completedSavepoint.getExternalPointer());
+    }
+
+    private final class WaitingForSavepoint implements State {
+
+        @Override
+        public State onSavepointCreation(CompletedCheckpoint completedSavepoint) {
+            return new SavepointCreated(completedSavepoint);
+        }
+
+        @Override
+        public State onSavepointCreationFailure(Throwable throwable) {
+            terminateExceptionally(throwable);
+            return new FinalState();
+        }
+    }
+
+    private final class SavepointCreated implements State {
+
+        private final CompletedCheckpoint completedSavepoint;
+
+        private SavepointCreated(CompletedCheckpoint completedSavepoint) {
+            this.completedSavepoint = completedSavepoint;
+        }
+
+        @Override
+        public State onExecutionsFinished() {
+            terminateSuccessfully(completedSavepoint);
+            return new FinalState();
+        }
+
+        @Override
+        public State onAnyExecutionNotFinished(
+                Iterable<ExecutionState> notFinishedExecutionStates) {
+            terminateExceptionallyWithGlobalFailover(
+                    notFinishedExecutionStates, completedSavepoint.getExternalPointer());
+            return new FinalState();
+        }
+    }
+
+    private static final class FinalState implements State {
+
+        @Override
+        public State onExecutionsFinished() {
+            return this;
+        }
+
+        @Override
+        public State onAnyExecutionNotFinished(
+                Iterable<ExecutionState> notFinishedExecutionStates) {
+            return this;
+        }
+    }
+
+    private interface State {
+
+        default State onSavepointCreation(CompletedCheckpoint completedSavepoint) {
+            throw new UnsupportedOperationException(
+                    this.getClass().getSimpleName()
+                            + " state does not support onSavepointCreation.");
+        }
+
+        default State onSavepointCreationFailure(Throwable throwable) {
+            throw new UnsupportedOperationException(
+                    this.getClass().getSimpleName()
+                            + " state does not support onSavepointCreationFailure.");
+        }
+
+        default State onExecutionsFinished() {
+            throw new UnsupportedOperationException(
+                    this.getClass().getSimpleName()
+                            + " state does not support onExecutionsFinished.");
+        }
+
+        default State onAnyExecutionNotFinished(
+                Iterable<ExecutionState> notFinishedExecutionStates) {
+            throw new UnsupportedOperationException(
+                    this.getClass().getSimpleName()
+                            + " state does not support onAnyExecutionNotFinished.");
+        }
+
+        default String getName() {
+            return this.getClass().getSimpleName();
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManager.java
new file mode 100644
index 0000000..2a651c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManager.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code StopWithSavepointTerminationManager} fulfills the contract given by {@link
+ * StopWithSavepointTerminationHandler} to run the stop-with-savepoint steps in a specific order.
+ */
+public class StopWithSavepointTerminationManager {
+
+    private final StopWithSavepointTerminationHandler stopWithSavepointTerminationHandler;
+
+    public StopWithSavepointTerminationManager(
+            StopWithSavepointTerminationHandler stopWithSavepointTerminationHandler) {
+        this.stopWithSavepointTerminationHandler =
+                Preconditions.checkNotNull(stopWithSavepointTerminationHandler);
+    }
+
+    /**
+     * Enforces the correct completion order of the passed {@code CompletableFuture} instances in
+     * accordance to the contract of {@link StopWithSavepointTerminationHandler}.
+     *
+     * @param completedSavepointFuture The {@code CompletableFuture} of the savepoint creation step.
+     * @param terminatedExecutionStatesFuture The {@code CompletableFuture} of the termination step.
+     * @param mainThreadExecutor The executor the {@code StopWithSavepointTerminationHandler}
+     *     operations run on.
+     * @return A {@code CompletableFuture} containing the path to the created savepoint.
+     */
+    public CompletableFuture<String> stopWithSavepoint(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> terminatedExecutionStatesFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        FutureUtils.assertNoException(
+                completedSavepointFuture
+                        // the completedSavepointFuture could also be completed by
+                        // CheckpointCanceller which doesn't run in the mainThreadExecutor
+                        .handleAsync(
+                                (completedSavepoint, throwable) -> {
+                                    stopWithSavepointTerminationHandler.handleSavepointCreation(
+                                            completedSavepoint, throwable);
+                                    return null;
+                                },
+                                mainThreadExecutor)
+                        .thenRun(
+                                () ->
+                                        FutureUtils.assertNoException(
+                                                // the execution termination has to run in a
+                                                // separate Runnable to disconnect it from any
+                                                // previous task failure handling
+                                                terminatedExecutionStatesFuture.thenAcceptAsync(
+                                                        stopWithSavepointTerminationHandler
+                                                                ::handleExecutionsTermination,
+                                                        mainThreadExecutor))));
+
+        return stopWithSavepointTerminationHandler.getSavepointPath();
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointScheduling.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointScheduling.java
new file mode 100644
index 0000000..728ae40
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointScheduling.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * {@code TestingCheckpointScheduling} is a basic testing implementation of {@link
+ * CheckpointScheduling} that provides a flag indicating whether checkpoint scheduling is enabled.
+ */
+public class TestingCheckpointScheduling implements CheckpointScheduling {
+
+    private final AtomicBoolean checkpointSchedulingEnabled;
+
+    public TestingCheckpointScheduling(boolean initialState) {
+        checkpointSchedulingEnabled = new AtomicBoolean(initialState);
+    }
+
+    @Override
+    public void startCheckpointScheduler() {
+        checkpointSchedulingEnabled.set(true);
+    }
+
+    @Override
+    public void stopCheckpointScheduler() {
+        checkpointSchedulingEnabled.set(false);
+    }
+
+    public boolean isEnabled() {
+        return checkpointSchedulingEnabled.get();
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 7462412..ffbbe01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -107,6 +107,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -647,6 +648,38 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
+    public void testStartingCheckpointSchedulerAfterExecutionGraphFinished() {
+        assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(
+                SchedulerBase::startCheckpointScheduler);
+    }
+
+    @Test
+    public void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() {
+        assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(
+                SchedulerBase::stopCheckpointScheduler);
+    }
+
+    private void assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(
+            Consumer<DefaultScheduler> callSchedulingOperation) {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        enableCheckpointing(jobGraph);
+
+        final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+        assertThat(scheduler.getCheckpointCoordinator(), is(notNullValue()));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        Iterables.getOnlyElement(
+                                        scheduler.getExecutionGraph().getAllExecutionVertices())
+                                .getCurrentExecutionAttempt()
+                                .getAttemptId(),
+                        ExecutionState.FINISHED));
+
+        assertThat(scheduler.getCheckpointCoordinator(), is(nullValue()));
+        callSchedulingOperation.accept(scheduler);
+        assertThat(scheduler.getCheckpointCoordinator(), is(nullValue()));
+    }
+
+    @Test
     public void vertexIsNotAffectedByOutdatedDeployment() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 831e21d..2527e20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -55,16 +55,19 @@ public class TestingSchedulerNG implements SchedulerNG {
     private final Runnable startSchedulingRunnable;
     private final Consumer<Throwable> suspendConsumer;
     private final BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction;
+    private final Consumer<Throwable> handleGlobalFailureConsumer;
 
     private TestingSchedulerNG(
             CompletableFuture<Void> terminationFuture,
             Runnable startSchedulingRunnable,
             Consumer<Throwable> suspendConsumer,
-            BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction) {
+            BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction,
+            Consumer<Throwable> handleGlobalFailureConsumer) {
         this.terminationFuture = terminationFuture;
         this.startSchedulingRunnable = startSchedulingRunnable;
         this.suspendConsumer = suspendConsumer;
         this.triggerSavepointFunction = triggerSavepointFunction;
+        this.handleGlobalFailureConsumer = handleGlobalFailureConsumer;
     }
 
     @Override
@@ -90,7 +93,9 @@ public class TestingSchedulerNG implements SchedulerNG {
     }
 
     @Override
-    public void handleGlobalFailure(Throwable cause) {}
+    public void handleGlobalFailure(Throwable cause) {
+        handleGlobalFailureConsumer.accept(cause);
+    }
 
     @Override
     public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) {
@@ -224,6 +229,7 @@ public class TestingSchedulerNG implements SchedulerNG {
         private Consumer<Throwable> suspendConsumer = ignored -> {};
         private BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction =
                 (ignoredA, ignoredB) -> new CompletableFuture<>();
+        private Consumer<Throwable> handleGlobalFailureConsumer = (ignored) -> {};
 
         public Builder setTerminationFuture(CompletableFuture<Void> terminationFuture) {
             this.terminationFuture = terminationFuture;
@@ -246,12 +252,19 @@ public class TestingSchedulerNG implements SchedulerNG {
             return this;
         }
 
+        public Builder setHandleGlobalFailureConsumer(
+                Consumer<Throwable> handleGlobalFailureConsumer) {
+            this.handleGlobalFailureConsumer = handleGlobalFailureConsumer;
+            return this;
+        }
+
         public TestingSchedulerNG build() {
             return new TestingSchedulerNG(
                     terminationFuture,
                     startSchedulingRunnable,
                     suspendConsumer,
-                    triggerSavepointFunction);
+                    triggerSavepointFunction,
+                    handleGlobalFailureConsumer);
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
new file mode 100644
index 0000000..ce1ec5b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImplTest} tests {@link
+ * StopWithSavepointTerminationHandlerImpl}.
+ */
+public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
+
+    private static final JobID JOB_ID = new JobID();
+
+    private final TestingCheckpointScheduling checkpointScheduling =
+            new TestingCheckpointScheduling(false);
+
+    private StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver() {
+        return createTestInstance(
+                throwableCausingGlobalFailOver -> fail("No global failover should be triggered."));
+    }
+
+    private StopWithSavepointTerminationHandlerImpl createTestInstance(
+            Consumer<Throwable> handleGlobalFailureConsumer) {
+        // checkpointing should be always stopped before initiating stop-with-savepoint
+        checkpointScheduling.stopCheckpointScheduler();
+
+        final SchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        .setHandleGlobalFailureConsumer(handleGlobalFailureConsumer)
+                        .build();
+        return new StopWithSavepointTerminationHandlerImpl(
+                JOB_ID, scheduler, checkpointScheduling, log);
+    }
+
+    @Test
+    public void testHappyPath() throws ExecutionException, InterruptedException {
+        final StopWithSavepointTerminationHandlerImpl testInstance =
+                createTestInstanceFailingOnGlobalFailOver();
+
+        final EmptyStreamStateHandle streamStateHandle = new EmptyStreamStateHandle();
+        final CompletedCheckpoint completedSavepoint = createCompletedSavepoint(streamStateHandle);
+        testInstance.handleSavepointCreation(completedSavepoint, null);
+        testInstance.handleExecutionsTermination(Collections.singleton(ExecutionState.FINISHED));
+
+        assertThat(
+                testInstance.getSavepointPath().get(), is(completedSavepoint.getExternalPointer()));
+
+        assertFalse(
+                "The savepoint should not have been discarded.", streamStateHandle.isDisposed());
+        assertFalse("Checkpoint scheduling should be disabled.", checkpointScheduling.isEnabled());
+    }
+
+    @Test
+    public void testSavepointCreationFailureWithoutExecutionTermination() {
+        // savepoint creation failure is handled as expected if no execution termination happens
+        assertSavepointCreationFailure(testInstance -> {});
+    }
+
+    @Test
+    public void testSavepointCreationFailureWithFailingExecutions() {
+        // no global fail-over is expected to be triggered by the stop-with-savepoint despite the
+        // execution failure
+        assertSavepointCreationFailure(
+                testInstance ->
+                        testInstance.handleExecutionsTermination(
+                                Collections.singletonList(ExecutionState.FAILED)));
+    }
+
+    @Test
+    public void testSavepointCreationFailureWithFinishingExecutions() {
+        // checkpoint scheduling should be still enabled despite the finished executions
+        assertSavepointCreationFailure(
+                testInstance ->
+                        testInstance.handleExecutionsTermination(
+                                Collections.singletonList(ExecutionState.FINISHED)));
+    }
+
+    public void assertSavepointCreationFailure(
+            Consumer<StopWithSavepointTerminationHandler> handleExecutionsTermination) {
+        final StopWithSavepointTerminationHandlerImpl testInstance =
+                createTestInstanceFailingOnGlobalFailOver();
+
+        final String expectedErrorMessage = "Expected exception during savepoint creation.";
+        testInstance.handleSavepointCreation(null, new Exception(expectedErrorMessage));
+        handleExecutionsTermination.accept(testInstance);
+
+        try {
+            testInstance.getSavepointPath().get();
+            fail("An ExecutionException is expected.");
+        } catch (Throwable e) {
+            final Optional<Throwable> actualException =
+                    ExceptionUtils.findThrowableWithMessage(e, expectedErrorMessage);
+            assertTrue(
+                    "An exception with the expected error message should have been thrown.",
+                    actualException.isPresent());
+        }
+
+        // the checkpoint scheduling should be enabled in case of failure
+        assertTrue("Checkpoint scheduling should be enabled.", checkpointScheduling.isEnabled());
+    }
+
+    @Test
+    public void testFailedTerminationHandling() throws ExecutionException, InterruptedException {
+        final CompletableFuture<Throwable> globalFailOverTriggered = new CompletableFuture<>();
+        final StopWithSavepointTerminationHandlerImpl testInstance =
+                createTestInstance(globalFailOverTriggered::complete);
+
+        final ExecutionState expectedNonFinishedState = ExecutionState.FAILED;
+        final String expectedErrorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.",
+                        expectedNonFinishedState, JOB_ID);
+
+        final EmptyStreamStateHandle streamStateHandle = new EmptyStreamStateHandle();
+        final CompletedCheckpoint completedSavepoint = createCompletedSavepoint(streamStateHandle);
+
+        testInstance.handleSavepointCreation(completedSavepoint, null);
+        testInstance.handleExecutionsTermination(
+                Collections.singletonList(expectedNonFinishedState));
+
+        try {
+            testInstance.getSavepointPath().get();
+            fail("An ExecutionException is expected.");
+        } catch (Throwable e) {
+            final Optional<FlinkException> actualFlinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+            assertTrue(
+                    "A FlinkException should have been thrown.", actualFlinkException.isPresent());
+            assertThat(
+                    actualFlinkException.get(),
+                    FlinkMatchers.containsMessage(expectedErrorMessage));
+        }
+
+        assertTrue("Global fail-over was not triggered.", globalFailOverTriggered.isDone());
+        assertThat(
+                globalFailOverTriggered.get(), FlinkMatchers.containsMessage(expectedErrorMessage));
+
+        assertFalse("Savepoint should not be discarded.", streamStateHandle.isDisposed());
+
+        assertFalse(
+                "Checkpoint scheduling should not be enabled in case of failure.",
+                checkpointScheduling.isEnabled());
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testInvalidExecutionTerminationCall() {
+        createTestInstanceFailingOnGlobalFailOver()
+                .handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testSavepointCreationParameterBothNull() {
+        createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation(null, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSavepointCreationParameterBothSet() {
+        createTestInstanceFailingOnGlobalFailOver()
+                .handleSavepointCreation(
+                        createCompletedSavepoint(new EmptyStreamStateHandle()),
+                        new Exception(
+                                "No exception should be passed if a savepoint is available."));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testExecutionTerminationWithNull() {
+        createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination(null);
+    }
+
+    private static CompletedCheckpoint createCompletedSavepoint(
+            StreamStateHandle streamStateHandle) {
+        return new CompletedCheckpoint(
+                JOB_ID,
+                0,
+                0L,
+                0L,
+                new HashMap<>(),
+                null,
+                CheckpointProperties.forSavepoint(true),
+                new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"));
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManagerTest.java
new file mode 100644
index 0000000..cea9bc6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManagerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * {@code StopWithSavepointTerminationManagerTest} tests that {@link
+ * StopWithSavepointTerminationManager} applies the correct order expected by {@link
+ * StopWithSavepointTerminationHandler} regardless of the completion of the provided {@code
+ * CompletableFutures}.
+ */
+public class StopWithSavepointTerminationManagerTest extends TestLogger {
+
+    @Test
+    public void testCompletionInCorrectOrder() {
+        assertCorrectOrderOfProcessing(
+                (completedSavepointFuture, terminatedExecutionStatesFuture) -> {
+                    completedSavepointFuture.complete(null);
+                    terminatedExecutionStatesFuture.complete(null);
+                });
+    }
+
+    @Test
+    public void testCompletionInInverseOrder() {
+        assertCorrectOrderOfProcessing(
+                (completedSavepointFuture, terminatedExecutionStatesFuture) -> {
+                    terminatedExecutionStatesFuture.complete(null);
+                    completedSavepointFuture.complete(null);
+                });
+    }
+
+    private void assertCorrectOrderOfProcessing(
+            BiConsumer<CompletableFuture<CompletedCheckpoint>, CompletableFuture<ExecutionState>>
+                    completion) {
+        final CompletableFuture<CompletedCheckpoint> completedSavepointFuture =
+                new CompletableFuture<>();
+        final CompletableFuture<ExecutionState> terminatedExecutionStateFuture =
+                new CompletableFuture<>();
+
+        final TestingStopWithSavepointTerminationHandler stopWithSavepointTerminationHandler =
+                new TestingStopWithSavepointTerminationHandler();
+        new StopWithSavepointTerminationManager(stopWithSavepointTerminationHandler)
+                .stopWithSavepoint(
+                        completedSavepointFuture,
+                        terminatedExecutionStateFuture.thenApply(Collections::singleton),
+                        ComponentMainThreadExecutorServiceAdapter.forMainThread());
+        completion.accept(completedSavepointFuture, terminatedExecutionStateFuture);
+
+        assertThat(
+                stopWithSavepointTerminationHandler.getActualMethodCallOrder(),
+                CoreMatchers.is(
+                        Arrays.asList(
+                                MethodCall.SavepointCreationTermination,
+                                MethodCall.ExecutionTermination)));
+    }
+
+    private enum MethodCall {
+        SavepointCreationTermination,
+        ExecutionTermination
+    }
+
+    private static class TestingStopWithSavepointTerminationHandler
+            implements StopWithSavepointTerminationHandler {
+
+        private final List<MethodCall> methodCalls = new ArrayList<>(2);
+
+        @Override
+        public CompletableFuture<String> getSavepointPath() {
+            return FutureUtils.completedExceptionally(
+                    new Exception("The result is not relevant in this test."));
+        }
+
+        @Override
+        public void handleSavepointCreation(
+                CompletedCheckpoint completedSavepoint, Throwable throwable) {
+            methodCalls.add(MethodCall.SavepointCreationTermination);
+        }
+
+        @Override
+        public void handleExecutionsTermination(
+                Collection<ExecutionState> terminatedExecutionStates) {
+            methodCalls.add(MethodCall.ExecutionTermination);
+        }
+
+        public List<MethodCall> getActualMethodCallOrder() {
+            return methodCalls;
+        }
+    }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index ffef666..b2cdb4d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -35,18 +35,31 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -66,6 +79,7 @@ import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
 import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Description;
@@ -100,12 +114,15 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN;
 import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -542,6 +559,165 @@ public class SavepointITCase extends TestLogger {
         }
     }
 
+    @Test
+    public void testStopWithSavepointFailingInSnapshotCreation() throws Exception {
+        testStopWithFailingSourceInOnePipeline(
+                new SnapshotFailingInfiniteTestSource(),
+                folder.newFolder(),
+                // two restarts expected:
+                // 1. task failure restart
+                // 2. job failover triggered by the CheckpointFailureManager
+                2,
+                assertInSnapshotCreationFailure());
+    }
+
+    @Test
+    public void testStopWithSavepointFailingAfterSnapshotCreation() throws Exception {
+        testStopWithFailingSourceInOnePipeline(
+                new CancelFailingInfiniteTestSource(),
+                folder.newFolder(),
+                // two restarts expected:
+                // 1. task failure restart
+                // 2. job failover triggered by SchedulerBase.stopWithSavepoint
+                2,
+                assertAfterSnapshotCreationFailure());
+    }
+
+    private static BiConsumer<JobID, ExecutionException> assertAfterSnapshotCreationFailure() {
+        return (jobId, actualException) -> {
+            Optional<FlinkException> actualFlinkException =
+                    ExceptionUtils.findThrowable(actualException, FlinkException.class);
+            assertTrue(actualFlinkException.isPresent());
+            assertThat(
+                    actualFlinkException.get(),
+                    containsMessage(
+                            String.format(
+                                    "Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: FAILED. A global fail-over is triggered to recover the job %s.",
+                                    jobId)));
+        };
+    }
+
+    private static BiConsumer<JobID, ExecutionException> assertInSnapshotCreationFailure() {
+        return (ignored, actualException) -> {
+            Optional<CheckpointException> actualFailureCause =
+                    ExceptionUtils.findThrowable(actualException, CheckpointException.class);
+            assertTrue(actualFailureCause.isPresent());
+            assertThat(
+                    actualFailureCause.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.JOB_FAILOVER_REGION));
+        };
+    }
+
+    private static OneShotLatch failingPipelineLatch;
+    private static OneShotLatch succeedingPipelineLatch;
+
+    /**
+     * FLINK-21030
+     *
+     * <p>Tests the handling of a failure that happened while stopping an embarrassingly parallel
+     * job with a Savepoint. The test expects that the stopping action fails and all executions are
+     * in state {@code RUNNING} afterwards.
+     *
+     * @param failingSource the failing {@link SourceFunction} used in one of the two pipelines.
+     * @param expectedMaximumNumberOfRestarts the maximum number of restarts allowed by the restart
+     *     strategy.
+     * @param exceptionAssertion asserts the client-call exception to verify that the right error
+     *     was handled.
+     * @see SavepointITCase#failingPipelineLatch The latch used to trigger the successful start of
+     *     the later on failing pipeline.
+     * @see SavepointITCase#succeedingPipelineLatch The latch that triggers the successful start of
+     *     the succeeding pipeline.
+     * @throws Exception if an error occurred while running the test.
+     */
+    private static void testStopWithFailingSourceInOnePipeline(
+            InfiniteTestSource failingSource,
+            File savepointDir,
+            int expectedMaximumNumberOfRestarts,
+            BiConsumer<JobID, ExecutionException> exceptionAssertion)
+            throws Exception {
+        MiniClusterWithClientResource cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder().build());
+
+        failingPipelineLatch = new OneShotLatch();
+        succeedingPipelineLatch = new OneShotLatch();
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.getConfig()
+                .setRestartStrategy(
+                        RestartStrategies.fixedDelayRestart(expectedMaximumNumberOfRestarts, 0));
+        env.addSource(failingSource)
+                .name("Failing Source")
+                .map(
+                        value -> {
+                            failingPipelineLatch.trigger();
+                            return value;
+                        })
+                .addSink(new DiscardingSink<>());
+        env.addSource(new InfiniteTestSource())
+                .name("Succeeding Source")
+                .map(
+                        value -> {
+                            succeedingPipelineLatch.trigger();
+                            return value;
+                        })
+                .addSink(new DiscardingSink<>());
+
+        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+        cluster.before();
+        try {
+            ClusterClient<?> client = cluster.getClusterClient();
+            client.submitJob(jobGraph).get();
+
+            // we need to wait for both pipelines to be in state RUNNING because that's the only
+            // state which allows creating a savepoint
+            failingPipelineLatch.await();
+            succeedingPipelineLatch.await();
+
+            try {
+                client.stopWithSavepoint(jobGraph.getJobID(), false, savepointDir.getAbsolutePath())
+                        .get();
+                fail("The future should fail exceptionally.");
+            } catch (ExecutionException e) {
+                exceptionAssertion.accept(jobGraph.getJobID(), e);
+            }
+
+            // access the REST endpoint of the cluster to determine the state of each
+            // ExecutionVertex
+            final RestClient restClient =
+                    new RestClient(
+                            RestClientConfiguration.fromConfiguration(
+                                    new UnmodifiableConfiguration(new Configuration())),
+                            TestingUtils.defaultExecutor());
+
+            final URI restAddress = cluster.getRestAddres();
+            final JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
+            final JobMessageParameters params = detailsHeaders.getUnresolvedMessageParameters();
+            params.jobPathParameter.resolve(jobGraph.getJobID());
+
+            CommonTestUtils.waitUntilCondition(
+                    () -> {
+                        JobDetailsInfo detailsInfo =
+                                restClient
+                                        .sendRequest(
+                                                restAddress.getHost(),
+                                                restAddress.getPort(),
+                                                detailsHeaders,
+                                                params,
+                                                EmptyRequestBody.getInstance())
+                                        .get();
+
+                        return detailsInfo.getJobVerticesPerState().get(ExecutionState.RUNNING)
+                                == 2;
+                    },
+                    Deadline.fromNow(Duration.ofSeconds(10)));
+        } finally {
+            cluster.after();
+        }
+    }
+
     /**
      * FLINK-5985
      *
@@ -753,6 +929,40 @@ public class SavepointITCase extends TestLogger {
         }
     }
 
+    /**
+     * An {@link InfiniteTestSource} implementation that fails when cancel is called for the first
+     * time.
+     */
+    private static class CancelFailingInfiniteTestSource extends InfiniteTestSource {
+
+        private static volatile boolean cancelTriggered = false;
+
+        @Override
+        public void cancel() {
+            if (!cancelTriggered) {
+                cancelTriggered = true;
+                throw new RuntimeException("Expected RuntimeException after snapshot creation.");
+            }
+            super.cancel();
+        }
+    }
+
+    /** An {@link InfiniteTestSource} implementation that fails while creating a snapshot. */
+    private static class SnapshotFailingInfiniteTestSource extends InfiniteTestSource
+            implements CheckpointedFunction {
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            throw new Exception(
+                    "Expected Exception happened during snapshot creation within test source");
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            // all good here
+        }
+    }
+
     private static class StatefulCounter extends RichMapFunction<Integer, Integer>
             implements ListCheckpointed<byte[]> {
 


[flink] 01/05: [hotfix][tests] Adds log message to MiniClusterWithClientResource shutdown

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b3d620d79c359e2184b110b56e8ffd9e757f14f
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Feb 10 11:07:39 2021 +0100

    [hotfix][tests] Adds log message to MiniClusterWithClientResource shutdown
---
 .../java/org/apache/flink/test/util/MiniClusterWithClientResource.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
index 88bf319..62e0318 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
@@ -60,6 +60,7 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
 
     @Override
     public void after() {
+        log.info("Finalization triggered: Cluster shutdown is going to be initiated.");
         TestStreamEnvironment.unsetAsContext();
         TestEnvironment.unsetAsContext();
 


[flink] 03/05: [hotfix][test] Adds unit test for local and global failure happened concurrently

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e99f0ad2cbd1c1bc69b11081d3ec47155dae39f
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Feb 24 13:53:53 2021 +0100

    [hotfix][test] Adds unit test for local and global failure happened concurrently
    
    This test should verify that the scheduler works as expected when a local
    failure succeeds a global fail-over operation before the restarting happened.
---
 .../runtime/scheduler/DefaultSchedulerTest.java    | 58 ++++++++++++++++++++++
 1 file changed, 58 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 699bb71..7462412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -88,6 +89,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs;
 import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
@@ -588,6 +591,61 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
     }
 
+    /**
+     * This test covers the use-case where a global fail-over is followed by a local task failure.
+     * It verifies (besides checking the expected deployments) that the assert in the global
+     * recovery handling of {@link SchedulerBase#restoreState} is not triggered due to version
+     * updates.
+     */
+    @Test
+    public void handleGlobalFailureWithLocalFailure() {
+        final JobGraph jobGraph = singleJobVertexJobGraph(2);
+        final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+        enableCheckpointing(jobGraph);
+
+        final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+        final List<ExecutionAttemptID> attemptIds =
+                StreamSupport.stream(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices()
+                                        .spliterator(),
+                                false)
+                        .map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+                        .map(ArchivedExecution::getAttemptId)
+                        .collect(Collectors.toList());
+        final ExecutionAttemptID localFailureAttemptId = attemptIds.get(0);
+        scheduler.handleGlobalFailure(new Exception("global failure"));
+        // the local failure shouldn't affect the global fail-over
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        localFailureAttemptId,
+                        ExecutionState.FAILED,
+                        new Exception("local failure")));
+
+        for (ExecutionAttemptID attemptId : attemptIds) {
+            scheduler.updateTaskExecutionState(
+                    new TaskExecutionState(attemptId, ExecutionState.CANCELED));
+        }
+
+        taskRestartExecutor.triggerScheduledTasks();
+
+        final ExecutionVertexID executionVertexId0 =
+                new ExecutionVertexID(onlyJobVertex.getID(), 0);
+        final ExecutionVertexID executionVertexId1 =
+                new ExecutionVertexID(onlyJobVertex.getID(), 1);
+        assertThat(
+                "The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.",
+                testExecutionVertexOperations.getDeployedVertices(),
+                contains(
+                        executionVertexId0,
+                        executionVertexId1,
+                        executionVertexId0,
+                        executionVertexId1));
+    }
+
     @Test
     public void vertexIsNotAffectedByOutdatedDeployment() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);


[flink] 02/05: [hotfix][task] Interrupt source legacy thread on failure.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25ef41943a46034a65e8cdd4d9171af23c7f827b
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Feb 9 11:49:33 2021 +0100

    [hotfix][task] Interrupt source legacy thread on failure.
    
    If a legacy source task fails outside of the legacy thread, the legacy thread
    blocks proper cancellation (completion future never completed).
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 26 ++++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 8d2edfc..0a11c8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -179,6 +179,14 @@ public class SourceStreamTask<
     }
 
     @Override
+    protected void cleanUpInvoke() throws Exception {
+        if (isFailing()) {
+            interruptSourceThread(true);
+        }
+        super.cleanUpInvoke();
+    }
+
+    @Override
     protected void cancelTask() {
         cancelTask(true);
     }
@@ -202,14 +210,18 @@ public class SourceStreamTask<
                 mainOperator.cancel();
             }
         } finally {
-            if (sourceThread.isAlive()) {
-                if (interrupt) {
-                    sourceThread.interrupt();
-                }
-            } else if (!sourceThread.getCompletionFuture().isDone()) {
-                // source thread didn't start
-                sourceThread.getCompletionFuture().complete(null);
+            interruptSourceThread(interrupt);
+        }
+    }
+
+    private void interruptSourceThread(boolean interrupt) {
+        if (sourceThread.isAlive()) {
+            if (interrupt) {
+                sourceThread.interrupt();
             }
+        } else if (!sourceThread.getCompletionFuture().isDone()) {
+            // source thread didn't start
+            sourceThread.getCompletionFuture().complete(null);
         }
     }
 


[flink] 05/05: [hotfix][test] Removes unused jobId parameter

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2c737891afde0c63c1a453b1ee164b80b6a702c
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Feb 26 08:35:12 2021 +0100

    [hotfix][test] Removes unused jobId parameter
---
 .../runtime/scheduler/DefaultSchedulerTest.java    | 41 +++++++---------------
 1 file changed, 13 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index ffbbe01..597d71e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -286,8 +286,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionAttemptID attemptId =
                 archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -303,7 +302,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final TaskExecutionState taskExecutionState =
-                createFailedTaskExecutionState(jobGraph.getJobID(), new ExecutionAttemptID());
+                createFailedTaskExecutionState(new ExecutionAttemptID());
 
         assertFalse(scheduler.updateTaskExecutionState(taskExecutionState));
     }
@@ -324,8 +323,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -463,8 +461,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         .next();
         final ExecutionAttemptID attemptId =
                 sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         testRestartBackoffTimeStrategy.setCanRestart(false);
 
         testExecutionSlotAllocator.enableAutoCompletePendingRequests();
@@ -523,8 +520,7 @@ public class DefaultSchedulerTest extends TestLogger {
                                 .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -701,15 +697,13 @@ public class DefaultSchedulerTest extends TestLogger {
         // fail v1 and let it recover to SCHEDULED
         // the initial deployment of v1 will be outdated
         scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(
-                        jobGraph.getJobID(), v1.getCurrentExecutionAttempt().getAttemptId()));
+                createFailedTaskExecutionState(v1.getCurrentExecutionAttempt().getAttemptId()));
         taskRestartExecutor.triggerScheduledTasks();
 
         // fail v2 to get all pending slot requests in the initial deployments to be done
         // this triggers the outdated deployment of v1
         scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(
-                        jobGraph.getJobID(), v2.getCurrentExecutionAttempt().getAttemptId()));
+                createFailedTaskExecutionState(v2.getCurrentExecutionAttempt().getAttemptId()));
 
         // v1 should not be affected
         assertThat(sv1.getState(), is(equalTo(ExecutionState.SCHEDULED)));
@@ -741,8 +735,7 @@ public class DefaultSchedulerTest extends TestLogger {
         checkpointTriggeredLatch.await();
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(1)));
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0)));
     }
@@ -780,8 +773,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
         acknowledgePendingCheckpoint(scheduler, checkpointId);
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
     }
@@ -821,8 +813,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
         acknowledgePendingCheckpoint(scheduler, checkpointId);
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         final List<ExecutionVertexID> deployedExecutionVertices =
                 testExecutionVertexOperations.getDeployedVertices();
@@ -888,7 +879,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
-        final JobID jobId = jobGraph.getJobID();
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
@@ -923,7 +913,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void cancelWhileRestartingShouldWaitForRunningTasks() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
-        final JobID jobid = jobGraph.getJobID();
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
         final SchedulingTopology topology = scheduler.getSchedulingTopology();
 
@@ -959,7 +948,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void failureInfoIsSetAfterTaskFailure() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
-        final JobID jobId = jobGraph.getJobID();
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
@@ -985,7 +973,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
-        final JobID jobId = jobGraph.getJobID();
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
         final DefaultScheduler scheduler =
@@ -1066,7 +1053,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void testExceptionHistoryWithRestartableFailure() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
-        final JobID jobId = jobGraph.getJobID();
 
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
@@ -1081,7 +1067,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         .getAttemptId();
         final RuntimeException restartableException = new RuntimeException("restartable exception");
         Range<Long> updateStateTriggeringRestartTimeframe =
-                initiateFailure(scheduler, jobId, restartableAttemptId, restartableException);
+                initiateFailure(scheduler, restartableAttemptId, restartableException);
 
         taskRestartExecutor.triggerNonPeriodicScheduledTask();
 
@@ -1098,7 +1084,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         .getAttemptId();
         final RuntimeException failingException = new RuntimeException("failing exception");
         Range<Long> updateStateTriggeringJobFailureTimeframe =
-                initiateFailure(scheduler, jobId, failingAttemptId, failingException);
+                initiateFailure(scheduler, failingAttemptId, failingException);
 
         List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory();
         assertThat(actualExceptionHistory.size(), is(2));
@@ -1210,14 +1196,13 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     private static TaskExecutionState createFailedTaskExecutionState(
-            JobID jobId, ExecutionAttemptID executionAttemptID) {
+            ExecutionAttemptID executionAttemptID) {
         return new TaskExecutionState(
                 executionAttemptID, ExecutionState.FAILED, new Exception("Expected failure cause"));
     }
 
     private static Range<Long> initiateFailure(
             DefaultScheduler scheduler,
-            JobID jobId,
             ExecutionAttemptID executionAttemptID,
             Throwable exception) {
         long start = System.currentTimeMillis();