You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/25 10:31:21 UTC

[flink] branch release-1.11 updated (463d9e3 -> 1196dff)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 463d9e3  [FLINK-21028][task] Do not interrupt the source thread on stop with savepoint
     new 017b33d  [hotfix][task] Rename isStoppingBySyncSavepoint to ignoreEndOfInput
     new 7cb2154  [FLINK-21453][checkpointing][refactor] Replace advanceToEndOfTime with new CheckpointType.SAVEPOINT_TERMINATE
     new 1196dff  [FLINK-21453][checkpointing] Do not ignore endOfInput when terminating a job with savepoint

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  | 44 ++++++----------
 .../runtime/checkpoint/CheckpointProperties.java   | 19 ++++---
 .../flink/runtime/checkpoint/CheckpointType.java   | 38 +++++++++++---
 .../flink/runtime/dispatcher/Dispatcher.java       |  5 +-
 .../flink/runtime/executiongraph/Execution.java    | 27 +++-------
 .../network/api/serialization/EventSerializer.java | 16 ++++--
 .../runtime/jobgraph/tasks/AbstractInvokable.java  |  6 +--
 .../jobmanager/slots/TaskManagerGateway.java       |  5 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  6 +--
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  5 +-
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  9 +---
 .../flink/runtime/minicluster/MiniCluster.java     |  4 +-
 .../handler/job/savepoints/SavepointHandlers.java  |  4 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |  4 +-
 .../flink/runtime/scheduler/SchedulerNG.java       |  3 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  9 ++--
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  5 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  8 +--
 .../flink/runtime/webmonitor/RestfulGateway.java   |  5 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 15 ++----
 .../CheckpointCoordinatorTestingUtils.java         |  3 +-
 .../CheckpointCoordinatorTriggeringTest.java       | 33 ++----------
 .../checkpoint/CheckpointRequestDeciderTest.java   |  7 +--
 .../runtime/checkpoint/CheckpointTypeTest.java     |  2 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |  2 +-
 .../utils/SimpleAckingTaskManagerGateway.java      | 20 ++-----
 .../serialization/CheckpointSerializationTest.java |  2 +-
 .../api/serialization/EventSerializerTest.java     | 25 ++++++++-
 .../jobmaster/utils/TestingJobMasterGateway.java   |  6 +--
 .../CoordinatorEventsExactlyOnceITCase.java        |  4 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  7 +--
 .../runtime/scheduler/TestingSchedulerNG.java      |  3 +-
 .../taskexecutor/TestingTaskExecutorGateway.java   |  3 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java     | 16 ++----
 .../runtime/webmonitor/TestingRestfulGateway.java  |  2 +-
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |  6 +--
 .../streaming/api/operators/StreamSource.java      |  2 +-
 .../streaming/runtime/tasks/OperatorChain.java     | 10 ++--
 .../runtime/tasks/SourceOperatorStreamTask.java    |  9 ++--
 .../streaming/runtime/tasks/SourceStreamTask.java  | 11 ++--
 .../flink/streaming/runtime/tasks/StreamTask.java  | 40 +++++---------
 .../AbstractUdfStreamOperatorLifecycleTest.java    |  3 +-
 .../api/operators/async/AsyncWaitOperatorTest.java |  5 +-
 .../io/CheckpointBarrierAlignerTestBase.java       |  4 +-
 .../runtime/io/CheckpointSequenceValidator.java    |  4 +-
 .../runtime/tasks/OneInputStreamTaskTest.java      |  4 +-
 .../runtime/tasks/RestoreStreamTaskTest.java       |  2 +-
 .../tasks/SourceExternalCheckpointTriggerTest.java |  6 +--
 .../tasks/SourceOperatorStreamTaskTest.java        |  4 +-
 .../runtime/tasks/SourceStreamTaskTest.java        | 14 ++---
 .../runtime/tasks/SourceTaskTerminationTest.java   | 14 ++---
 .../tasks/StreamTaskExecutionDecorationTest.java   |  3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 61 +++++++++++++++-------
 .../runtime/tasks/SynchronousCheckpointITCase.java | 12 ++---
 .../runtime/tasks/SynchronousCheckpointTest.java   |  5 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    | 12 ++---
 .../jobmaster/JobMasterTriggerSavepointITCase.java |  3 +-
 .../state/StatefulOperatorChainedTaskTest.java     |  4 +-
 59 files changed, 261 insertions(+), 352 deletions(-)


[flink] 01/03: [hotfix][task] Rename isStoppingBySyncSavepoint to ignoreEndOfInput

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 017b33db14bebbc956a035c7c10dd2605152bbe1
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Feb 23 21:00:37 2021 +0100

    [hotfix][task] Rename isStoppingBySyncSavepoint to ignoreEndOfInput
---
 .../org/apache/flink/streaming/api/operators/StreamSource.java |  2 +-
 .../apache/flink/streaming/runtime/tasks/OperatorChain.java    | 10 +++++-----
 .../apache/flink/streaming/runtime/tasks/SourceStreamTask.java |  2 +-
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java   |  6 +++---
 4 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 30ade24..8ab8a6b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -119,7 +119,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
                 // interface,
                 // so we still need the following call to end the input
                 synchronized (lockingObject) {
-                    operatorChain.setIsStoppingBySyncSavepoint(false);
+                    operatorChain.setIgnoreEndOfInput(false);
                     operatorChain.endHeadOperatorInput(1);
                 }
             }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 9af6320..74dfbf5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -105,7 +105,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
     private final OperatorEventDispatcherImpl operatorEventDispatcher;
 
-    private boolean isStoppingBySyncSavepoint;
+    private boolean ignoreEndOfInput;
 
     /**
      * Current status of the input stream of the operator chain. Watermarks explicitly generated by
@@ -291,7 +291,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
      * @param inputId the input ID starts from 1 which indicates the first input.
      */
     public void endHeadOperatorInput(int inputId) throws Exception {
-        if (headOperatorWrapper != null && !isStoppingBySyncSavepoint) {
+        if (headOperatorWrapper != null && !ignoreEndOfInput) {
             headOperatorWrapper.endOperatorInput(inputId);
         }
     }
@@ -317,7 +317,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
      */
     protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
         if (headOperatorWrapper != null) {
-            headOperatorWrapper.close(actionExecutor, isStoppingBySyncSavepoint);
+            headOperatorWrapper.close(actionExecutor, ignoreEndOfInput);
         }
     }
 
@@ -582,8 +582,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
         return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
     }
 
-    public void setIsStoppingBySyncSavepoint(boolean stoppingBySyncSavepoint) {
-        this.isStoppingBySyncSavepoint = stoppingBySyncSavepoint;
+    public void setIgnoreEndOfInput(boolean ignoreEndOfInput) {
+        this.ignoreEndOfInput = ignoreEndOfInput;
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index e749c52..db1fba9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -253,7 +253,7 @@ public class SourceStreamTask<
                 headOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
                 if (!wasStoppedExternally && !isCanceled()) {
                     synchronized (lock) {
-                        operatorChain.setIsStoppingBySyncSavepoint(false);
+                        operatorChain.setIgnoreEndOfInput(false);
                     }
                 }
                 completionFuture.complete(null);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4d7b3c7..32b8612 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -404,7 +404,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         if (!succeeded && activeSyncSavepointId != null && activeSyncSavepointId == id) {
             // allow to process further EndOfPartition events
             activeSyncSavepointId = null;
-            operatorChain.setIsStoppingBySyncSavepoint(false);
+            operatorChain.setIgnoreEndOfInput(false);
         }
         syncSavepointId = null;
     }
@@ -415,7 +415,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                 "at most one stop-with-savepoint checkpoint at a time is allowed");
         syncSavepointId = checkpointId;
         activeSyncSavepointId = checkpointId;
-        operatorChain.setIsStoppingBySyncSavepoint(true);
+        operatorChain.setIgnoreEndOfInput(true);
     }
 
     @VisibleForTesting
@@ -999,7 +999,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                         } else if (activeSyncSavepointId != null
                                 && activeSyncSavepointId < checkpointMetaData.getCheckpointId()) {
                             activeSyncSavepointId = null;
-                            operatorChain.setIsStoppingBySyncSavepoint(false);
+                            operatorChain.setIgnoreEndOfInput(false);
                         }
 
                         subtaskCheckpointCoordinator.checkpointState(


[flink] 03/03: [FLINK-21453][checkpointing] Do not ignore endOfInput when terminating a job with savepoint

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1196dff8f1630a3290d3c809d56c2ee8dd78f5ef
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Feb 23 21:07:45 2021 +0100

    [FLINK-21453][checkpointing] Do not ignore endOfInput when terminating a job with savepoint
    
    When job is stopping with savepoint WITH drain flag (terminating), there is no intention
    to resume the job ever from that savepoint. Hence we have to make sure that we flush
    all of the buffered records from the job. To do that, we need to invoke endOfInput.
---
 .../flink/runtime/checkpoint/CheckpointType.java   |  4 ++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  8 ++--
 .../streaming/runtime/tasks/StreamTaskTest.java    | 43 ++++++++++++++++++----
 3 files changed, 44 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
index 02c5c1f..d123669 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
@@ -59,6 +59,10 @@ public enum CheckpointType {
         return getPostCheckpointAction() == PostCheckpointAction.TERMINATE;
     }
 
+    public boolean shouldIgnoreEndOfInput() {
+        return getPostCheckpointAction() == PostCheckpointAction.SUSPEND;
+    }
+
     /** What's the intended action after the checkpoint (relevant for stopping with savepoint). */
     public enum PostCheckpointAction {
         NONE,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 573bd31..f86ebd3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -409,13 +409,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         syncSavepointId = null;
     }
 
-    private void setSynchronousSavepointId(long checkpointId) {
+    private void setSynchronousSavepointId(long checkpointId, boolean ignoreEndOfInput) {
         Preconditions.checkState(
                 syncSavepointId == null,
                 "at most one stop-with-savepoint checkpoint at a time is allowed");
         syncSavepointId = checkpointId;
         activeSyncSavepointId = checkpointId;
-        operatorChain.setIgnoreEndOfInput(true);
+        operatorChain.setIgnoreEndOfInput(ignoreEndOfInput);
     }
 
     @VisibleForTesting
@@ -977,7 +977,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
             actionExecutor.runThrowing(
                     () -> {
                         if (checkpointOptions.getCheckpointType().isSynchronous()) {
-                            setSynchronousSavepointId(checkpointMetaData.getCheckpointId());
+                            setSynchronousSavepointId(
+                                    checkpointMetaData.getCheckpointId(),
+                                    checkpointOptions.getCheckpointType().shouldIgnoreEndOfInput());
 
                             if (checkpointOptions.getCheckpointType().shouldAdvanceToEndOfTime()) {
                                 advanceToEndOfEventTime();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 904097e..f9faf02 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
@@ -154,7 +155,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT_SUSPEND;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
@@ -190,19 +190,45 @@ public class StreamTaskTest extends TestLogger {
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
 
     @Test
-    public void testSyncSavepointCompleted() throws Exception {
-        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
+    public void testSavepointSuspendCompleted() throws Exception {
+        testSyncSavepointWithEndInput(
+                StreamTask::notifyCheckpointCompleteAsync, CheckpointType.SAVEPOINT_SUSPEND, false);
+    }
+
+    @Test
+    public void testSavepointTerminateCompleted() throws Exception {
+        testSyncSavepointWithEndInput(
+                StreamTask::notifyCheckpointCompleteAsync,
+                CheckpointType.SAVEPOINT_TERMINATE,
+                true);
+    }
+
+    @Test
+    public void testSavepointSuspendedAborted() throws Exception {
+        testSyncSavepointWithEndInput(
+                (task, id) -> task.abortCheckpointOnBarrier(id, new RuntimeException()),
+                CheckpointType.SAVEPOINT_SUSPEND,
+                true);
     }
 
     @Test
-    public void testSyncSavepointAborted() throws Exception {
+    public void testSavepointTerminateAborted() throws Exception {
         testSyncSavepointWithEndInput(
-                (task, id) -> task.abortCheckpointOnBarrier(id, new RuntimeException()), true);
+                (task, id) -> task.abortCheckpointOnBarrier(id, new RuntimeException()),
+                CheckpointType.SAVEPOINT_TERMINATE,
+                true);
     }
 
     @Test
-    public void testSyncSavepointAbortedAsync() throws Exception {
-        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointAbortAsync, true);
+    public void testSavepointSuspendAbortedAsync() throws Exception {
+        testSyncSavepointWithEndInput(
+                StreamTask::notifyCheckpointAbortAsync, CheckpointType.SAVEPOINT_SUSPEND, true);
+    }
+
+    @Test
+    public void testSavepointTerminateAbortedAsync() throws Exception {
+        testSyncSavepointWithEndInput(
+                StreamTask::notifyCheckpointAbortAsync, CheckpointType.SAVEPOINT_TERMINATE, true);
     }
 
     /**
@@ -217,6 +243,7 @@ public class StreamTaskTest extends TestLogger {
      */
     private void testSyncSavepointWithEndInput(
             BiConsumerWithException<StreamTask<?, ?>, Long, IOException> savepointResult,
+            CheckpointType checkpointType,
             boolean expectEndInput)
             throws Exception {
         StreamTaskMailboxTestHarness<String> harness =
@@ -237,7 +264,7 @@ public class StreamTaskTest extends TestLogger {
                     try {
                         harness.streamTask.triggerCheckpointOnBarrier(
                                 new CheckpointMetaData(checkpointId, checkpointId),
-                                new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()),
+                                new CheckpointOptions(checkpointType, getDefault()),
                                 new CheckpointMetrics());
                     } catch (IOException e) {
                         fail(e.getMessage());


[flink] 02/03: [FLINK-21453][checkpointing][refactor] Replace advanceToEndOfTime with new CheckpointType.SAVEPOINT_TERMINATE

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cb2154d7febe419d4d4b80900984c4132a90194
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Feb 23 20:34:00 2021 +0100

    [FLINK-21453][checkpointing][refactor] Replace advanceToEndOfTime with new CheckpointType.SAVEPOINT_TERMINATE
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 44 ++++++++--------------
 .../runtime/checkpoint/CheckpointProperties.java   | 19 +++++-----
 .../flink/runtime/checkpoint/CheckpointType.java   | 34 +++++++++++++----
 .../flink/runtime/dispatcher/Dispatcher.java       |  5 +--
 .../flink/runtime/executiongraph/Execution.java    | 27 ++++---------
 .../network/api/serialization/EventSerializer.java | 16 +++++---
 .../runtime/jobgraph/tasks/AbstractInvokable.java  |  6 +--
 .../jobmanager/slots/TaskManagerGateway.java       |  5 +--
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  6 +--
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  5 +--
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  9 +----
 .../flink/runtime/minicluster/MiniCluster.java     |  4 +-
 .../handler/job/savepoints/SavepointHandlers.java  |  4 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |  4 +-
 .../flink/runtime/scheduler/SchedulerNG.java       |  3 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  9 ++---
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  5 +--
 .../org/apache/flink/runtime/taskmanager/Task.java |  8 +---
 .../flink/runtime/webmonitor/RestfulGateway.java   |  5 +--
 .../checkpoint/CheckpointCoordinatorTest.java      | 15 +++-----
 .../CheckpointCoordinatorTestingUtils.java         |  3 +-
 .../CheckpointCoordinatorTriggeringTest.java       | 33 +++-------------
 .../checkpoint/CheckpointRequestDeciderTest.java   |  7 +---
 .../runtime/checkpoint/CheckpointTypeTest.java     |  2 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |  2 +-
 .../utils/SimpleAckingTaskManagerGateway.java      | 20 ++--------
 .../serialization/CheckpointSerializationTest.java |  2 +-
 .../api/serialization/EventSerializerTest.java     | 25 +++++++++++-
 .../jobmaster/utils/TestingJobMasterGateway.java   |  6 +--
 .../CoordinatorEventsExactlyOnceITCase.java        |  4 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  7 +---
 .../runtime/scheduler/TestingSchedulerNG.java      |  3 +-
 .../taskexecutor/TestingTaskExecutorGateway.java   |  3 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java     | 16 ++------
 .../runtime/webmonitor/TestingRestfulGateway.java  |  2 +-
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |  6 +--
 .../runtime/tasks/SourceOperatorStreamTask.java    |  9 ++---
 .../streaming/runtime/tasks/SourceStreamTask.java  |  9 ++---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 28 ++++----------
 .../AbstractUdfStreamOperatorLifecycleTest.java    |  3 +-
 .../api/operators/async/AsyncWaitOperatorTest.java |  5 +--
 .../io/CheckpointBarrierAlignerTestBase.java       |  4 +-
 .../runtime/io/CheckpointSequenceValidator.java    |  4 +-
 .../runtime/tasks/OneInputStreamTaskTest.java      |  4 +-
 .../runtime/tasks/RestoreStreamTaskTest.java       |  2 +-
 .../tasks/SourceExternalCheckpointTriggerTest.java |  6 +--
 .../tasks/SourceOperatorStreamTaskTest.java        |  4 +-
 .../runtime/tasks/SourceStreamTaskTest.java        | 14 +++----
 .../runtime/tasks/SourceTaskTerminationTest.java   | 14 +++----
 .../tasks/StreamTaskExecutionDecorationTest.java   |  3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 22 ++++-------
 .../runtime/tasks/SynchronousCheckpointITCase.java | 12 ++----
 .../runtime/tasks/SynchronousCheckpointTest.java   |  5 +--
 .../jobmaster/JobMasterStopWithSavepointIT.java    | 12 ++----
 .../jobmaster/JobMasterTriggerSavepointITCase.java |  3 +-
 .../state/StatefulOperatorChainedTaskTest.java     |  4 +-
 57 files changed, 210 insertions(+), 334 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 46da0d2..e0645fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -441,14 +442,13 @@ public class CheckpointCoordinator {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, false, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation);
     }
 
     /**
      * Triggers a synchronous savepoint with the given savepoint directory as a target.
      *
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers.
+     * @param terminate flag indicating if the job should terminate or just suspend
      * @param targetLocation Target location for the savepoint, optional. If null, the state
      *     backend's configured default will be used.
      * @return A future to the completed checkpoint
@@ -456,17 +456,16 @@ public class CheckpointCoordinator {
      *     savepoint directory has been configured
      */
     public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
-            final boolean advanceToEndOfEventTime, @Nullable final String targetLocation) {
+            final boolean terminate, @Nullable final String targetLocation) {
 
         final CheckpointProperties properties =
-                CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled);
+                CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, advanceToEndOfEventTime, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
             final CheckpointProperties checkpointProperties,
-            final boolean advanceToEndOfEventTime,
             @Nullable final String targetLocation) {
 
         checkNotNull(checkpointProperties);
@@ -476,11 +475,7 @@ public class CheckpointCoordinator {
         final CompletableFuture<CompletedCheckpoint> resultFuture = new CompletableFuture<>();
         timer.execute(
                 () ->
-                        triggerCheckpoint(
-                                        checkpointProperties,
-                                        targetLocation,
-                                        false,
-                                        advanceToEndOfEventTime)
+                        triggerCheckpoint(checkpointProperties, targetLocation, false)
                                 .whenComplete(
                                         (completedCheckpoint, throwable) -> {
                                             if (throwable == null) {
@@ -502,25 +497,24 @@ public class CheckpointCoordinator {
      * @return a future to the completed checkpoint.
      */
     public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic) {
-        return triggerCheckpoint(checkpointProperties, null, isPeriodic, false);
+        return triggerCheckpoint(checkpointProperties, null, isPeriodic);
     }
 
     @VisibleForTesting
     public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
             CheckpointProperties props,
             @Nullable String externalSavepointLocation,
-            boolean isPeriodic,
-            boolean advanceToEndOfTime) {
+            boolean isPeriodic) {
 
-        if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
+        if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE
+                && !(props.isSynchronous() && props.isSavepoint())) {
             return FutureUtils.completedExceptionally(
                     new IllegalArgumentException(
                             "Only synchronous savepoints are allowed to advance the watermark to MAX."));
         }
 
         CheckpointTriggerRequest request =
-                new CheckpointTriggerRequest(
-                        props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
+                new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic);
         chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
         return request.onCompletionPromise;
     }
@@ -619,8 +613,7 @@ public class CheckpointCoordinator {
                                                         checkpointId,
                                                         checkpoint.getCheckpointStorageLocation(),
                                                         request.props,
-                                                        executions,
-                                                        request.advanceToEndOfTime);
+                                                        executions);
 
                                                 coordinatorsToCheckpoint.forEach(
                                                         (ctx) ->
@@ -820,8 +813,7 @@ public class CheckpointCoordinator {
             long checkpointID,
             CheckpointStorageLocation checkpointStorageLocation,
             CheckpointProperties props,
-            Execution[] executions,
-            boolean advanceToEndOfTime) {
+            Execution[] executions) {
 
         final CheckpointOptions checkpointOptions =
                 new CheckpointOptions(
@@ -834,8 +826,7 @@ public class CheckpointCoordinator {
         // send the messages to the tasks that trigger their checkpoint
         for (Execution execution : executions) {
             if (props.isSynchronous()) {
-                execution.triggerSynchronousSavepoint(
-                        checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
+                execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions);
             } else {
                 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
             }
@@ -2037,21 +2028,18 @@ public class CheckpointCoordinator {
         final CheckpointProperties props;
         final @Nullable String externalSavepointLocation;
         final boolean isPeriodic;
-        final boolean advanceToEndOfTime;
         private final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
                 new CompletableFuture<>();
 
         CheckpointTriggerRequest(
                 CheckpointProperties props,
                 @Nullable String externalSavepointLocation,
-                boolean isPeriodic,
-                boolean advanceToEndOfTime) {
+                boolean isPeriodic) {
 
             this.timestamp = System.currentTimeMillis();
             this.props = checkNotNull(props);
             this.externalSavepointLocation = externalSavepointLocation;
             this.isPeriodic = isPeriodic;
-            this.advanceToEndOfTime = advanceToEndOfTime;
         }
 
         CompletableFuture<CompletedCheckpoint> getOnCompletionFuture() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 96162a2..91c9374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -238,14 +238,6 @@ public class CheckpointProperties implements Serializable {
     //  Factories and pre-configured properties
     // ------------------------------------------------------------------------
 
-    private static final CheckpointProperties SYNC_SAVEPOINT =
-            new CheckpointProperties(
-                    true, CheckpointType.SYNC_SAVEPOINT, false, false, false, false, false);
-
-    private static final CheckpointProperties SYNC_SAVEPOINT_NOT_FORCED =
-            new CheckpointProperties(
-                    false, CheckpointType.SYNC_SAVEPOINT, false, false, false, false, false);
-
     private static final CheckpointProperties SAVEPOINT =
             new CheckpointProperties(
                     true, CheckpointType.SAVEPOINT, false, false, false, false, false);
@@ -296,8 +288,15 @@ public class CheckpointProperties implements Serializable {
         return forced ? SAVEPOINT : SAVEPOINT_NO_FORCE;
     }
 
-    public static CheckpointProperties forSyncSavepoint(boolean forced) {
-        return forced ? SYNC_SAVEPOINT : SYNC_SAVEPOINT_NOT_FORCED;
+    public static CheckpointProperties forSyncSavepoint(boolean forced, boolean terminate) {
+        return new CheckpointProperties(
+                forced,
+                terminate ? CheckpointType.SAVEPOINT_TERMINATE : CheckpointType.SAVEPOINT_SUSPEND,
+                false,
+                false,
+                false,
+                false,
+                false);
     }
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
index a1df789..02c5c1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
@@ -22,22 +22,25 @@ package org.apache.flink.runtime.checkpoint;
 public enum CheckpointType {
 
     /** A checkpoint, full or incremental. */
-    CHECKPOINT(false, false),
+    CHECKPOINT(false, PostCheckpointAction.NONE),
 
     /** A regular savepoint. */
-    SAVEPOINT(true, false),
+    SAVEPOINT(true, PostCheckpointAction.NONE),
 
-    /** A savepoint taken while suspending/terminating the job. */
-    SYNC_SAVEPOINT(true, true);
+    /** A savepoint taken while suspending the job. */
+    SAVEPOINT_SUSPEND(true, PostCheckpointAction.SUSPEND),
+
+    /** A savepoint taken while terminating the job. */
+    SAVEPOINT_TERMINATE(true, PostCheckpointAction.TERMINATE);
 
     private final boolean isSavepoint;
 
-    private final boolean isSynchronous;
+    private final PostCheckpointAction postCheckpointAction;
 
-    CheckpointType(final boolean isSavepoint, final boolean isSynchronous) {
+    CheckpointType(final boolean isSavepoint, final PostCheckpointAction postCheckpointAction) {
 
         this.isSavepoint = isSavepoint;
-        this.isSynchronous = isSynchronous;
+        this.postCheckpointAction = postCheckpointAction;
     }
 
     public boolean isSavepoint() {
@@ -45,6 +48,21 @@ public enum CheckpointType {
     }
 
     public boolean isSynchronous() {
-        return isSynchronous;
+        return postCheckpointAction != PostCheckpointAction.NONE;
+    }
+
+    public PostCheckpointAction getPostCheckpointAction() {
+        return postCheckpointAction;
+    }
+
+    public boolean shouldAdvanceToEndOfTime() {
+        return getPostCheckpointAction() == PostCheckpointAction.TERMINATE;
+    }
+
+    /** What's the intended action after the checkpoint (relevant for stopping with savepoint). */
+    public enum PostCheckpointAction {
+        NONE,
+        SUSPEND,
+        TERMINATE
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 18939bb..a586f59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -729,15 +729,14 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     public CompletableFuture<String> stopWithSavepoint(
             final JobID jobId,
             final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
+            final boolean terminate,
             final Time timeout) {
         final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
                 getJobMasterGatewayFuture(jobId);
 
         return jobMasterGatewayFuture.thenCompose(
                 (JobMasterGateway jobMasterGateway) ->
-                        jobMasterGateway.stopWithSavepoint(
-                                targetDirectory, advanceToEndOfEventTime, timeout));
+                        jobMasterGateway.stopWithSavepoint(targetDirectory, terminate, timeout));
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 50f05ac..8fa4e61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -1111,7 +1112,7 @@ public class Execution
      */
     public void triggerCheckpoint(
             long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-        triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, false);
+        triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
     }
 
     /**
@@ -1120,26 +1121,17 @@ public class Execution
      * @param checkpointId of th checkpoint to trigger
      * @param timestamp of the checkpoint to trigger
      * @param checkpointOptions of the checkpoint to trigger
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
      */
     public void triggerSynchronousSavepoint(
-            long checkpointId,
-            long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
-        triggerCheckpointHelper(
-                checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
+            long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
+        triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
     }
 
     private void triggerCheckpointHelper(
-            long checkpointId,
-            long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 
         final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-        if (advanceToEndOfEventTime
+        if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
                 && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
             throw new IllegalArgumentException(
                     "Only synchronous savepoints are allowed to advance the watermark to MAX.");
@@ -1151,12 +1143,7 @@ public class Execution
             final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
             taskManagerGateway.triggerCheckpoint(
-                    attemptId,
-                    getVertex().getJobId(),
-                    checkpointId,
-                    timestamp,
-                    checkpointOptions,
-                    advanceToEndOfEventTime);
+                    attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
         } else {
             LOG.debug(
                     "The execution has no slot assigned. This indicates that the execution is no longer running.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 210d7c7..f0a3cea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -64,7 +64,9 @@ public class EventSerializer {
 
     private static final int CHECKPOINT_TYPE_SAVEPOINT = 1;
 
-    private static final int CHECKPOINT_TYPE_SYNC_SAVEPOINT = 2;
+    private static final int CHECKPOINT_TYPE_SAVEPOINT_SUSPEND = 2;
+
+    private static final int CHECKPOINT_TYPE_SAVEPOINT_TERMINATE = 3;
 
     // ------------------------------------------------------------------------
     //  Serialization Logic
@@ -220,8 +222,10 @@ public class EventSerializer {
             typeInt = CHECKPOINT_TYPE_CHECKPOINT;
         } else if (checkpointType == CheckpointType.SAVEPOINT) {
             typeInt = CHECKPOINT_TYPE_SAVEPOINT;
-        } else if (checkpointType == CheckpointType.SYNC_SAVEPOINT) {
-            typeInt = CHECKPOINT_TYPE_SYNC_SAVEPOINT;
+        } else if (checkpointType == CheckpointType.SAVEPOINT_SUSPEND) {
+            typeInt = CHECKPOINT_TYPE_SAVEPOINT_SUSPEND;
+        } else if (checkpointType == CheckpointType.SAVEPOINT_TERMINATE) {
+            typeInt = CHECKPOINT_TYPE_SAVEPOINT_TERMINATE;
         } else {
             throw new IOException("Unknown checkpoint type: " + checkpointType);
         }
@@ -257,8 +261,10 @@ public class EventSerializer {
             checkpointType = CheckpointType.CHECKPOINT;
         } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT) {
             checkpointType = CheckpointType.SAVEPOINT;
-        } else if (checkpointTypeCode == CHECKPOINT_TYPE_SYNC_SAVEPOINT) {
-            checkpointType = CheckpointType.SYNC_SAVEPOINT;
+        } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT_SUSPEND) {
+            checkpointType = CheckpointType.SAVEPOINT_SUSPEND;
+        } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT_TERMINATE) {
+            checkpointType = CheckpointType.SAVEPOINT_TERMINATE;
         } else {
             throw new IOException("Unknown checkpoint type code: " + checkpointTypeCode);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 22fbb8a..c98ed6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -209,15 +209,11 @@ public abstract class AbstractInvokable {
      *
      * @param checkpointMetaData Meta data for about this checkpoint
      * @param checkpointOptions Options for performing this checkpoint
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
      * @return future with value of {@code false} if the checkpoint was not carried out, {@code
      *     true} otherwise
      */
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         throw new UnsupportedOperationException(
                 String.format(
                         "triggerCheckpointAsync not supported by %s", this.getClass().getName()));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index d209481..c115f2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -127,16 +127,13 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway {
      * @param checkpointId of the checkpoint to trigger
      * @param timestamp of the checkpoint to trigger
      * @param checkpointOptions of the checkpoint to trigger
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
      */
     void triggerCheckpoint(
             ExecutionAttemptID executionAttemptID,
             JobID jobId,
             long checkpointId,
             long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime);
+            CheckpointOptions checkpointOptions);
 
     /**
      * Frees the slot with the given allocation ID.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 428a0cc..6cae89f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -725,11 +725,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
-            final Time timeout) {
+            @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
 
-        return schedulerNG.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime);
+        return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b3345dc..6237985 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -234,14 +234,13 @@ public interface JobMasterGateway
      *
      * @param targetDirectory to which to write the savepoint data or null if the default savepoint
      *     directory should be used
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
+     * @param terminate flag indicating if the job should terminate or just suspend
      * @param timeout for the rpc call
      * @return Future which is completed with the savepoint path once completed
      */
     CompletableFuture<String> stopWithSavepoint(
             @Nullable final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
+            final boolean terminate,
             @RpcTimeout final Time timeout);
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index a7eb15f..714110e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -105,14 +105,9 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
             JobID jobId,
             long checkpointId,
             long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointOptions checkpointOptions) {
         taskExecutorGateway.triggerCheckpoint(
-                executionAttemptID,
-                checkpointId,
-                timestamp,
-                checkpointOptions,
-                advanceToEndOfEventTime);
+                executionAttemptID, checkpointId, timestamp, checkpointOptions);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 93335bd..21da08f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -668,11 +668,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
     }
 
     public CompletableFuture<String> stopWithSavepoint(
-            JobID jobId, String targetDirectory, boolean advanceToEndOfEventTime) {
+            JobID jobId, String targetDirectory, boolean terminate) {
         return runDispatcherCommand(
                 dispatcherGateway ->
                         dispatcherGateway.stopWithSavepoint(
-                                jobId, targetDirectory, advanceToEndOfEventTime, rpcTimeout));
+                                jobId, targetDirectory, terminate, rpcTimeout));
     }
 
     public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
index 8d9f710..35bfe85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
@@ -167,13 +167,13 @@ public class SavepointHandlers
                         HttpResponseStatus.BAD_REQUEST);
             }
 
-            final boolean advanceToEndOfEventTime = request.getRequestBody().shouldDrain();
+            final boolean shouldDrain = request.getRequestBody().shouldDrain();
             final String targetDirectory =
                     requestedTargetDirectory != null
                             ? requestedTargetDirectory
                             : defaultSavepointDir;
             return gateway.stopWithSavepoint(
-                    jobId, targetDirectory, advanceToEndOfEventTime, RpcUtils.INF_TIMEOUT);
+                    jobId, targetDirectory, shouldDrain, RpcUtils.INF_TIMEOUT);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1396c6d..c88ac57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -1045,7 +1045,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            final String targetDirectory, final boolean advanceToEndOfEventTime) {
+            final String targetDirectory, final boolean terminate) {
         mainThreadExecutor.assertRunningInMainThread();
 
         final CheckpointCoordinator checkpointCoordinator =
@@ -1082,7 +1082,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
-                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory)
+                        .triggerSynchronousSavepoint(terminate, targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
         final CompletableFuture<JobStatus> terminationFuture =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 06e74e6..e2fd16b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -146,8 +146,7 @@ public interface SchedulerNG {
 
     void declineCheckpoint(DeclineCheckpoint decline);
 
-    CompletableFuture<String> stopWithSavepoint(
-            String targetDirectory, boolean advanceToEndOfEventTime);
+    CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate);
 
     // ------------------------------------------------------------------------
     //  Operator Coordinator related methods
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7e28e6e..bd224e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -887,8 +888,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
             ExecutionAttemptID executionAttemptID,
             long checkpointId,
             long checkpointTimestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointOptions checkpointOptions) {
         log.debug(
                 "Trigger checkpoint {}@{} for {}.",
                 checkpointId,
@@ -896,7 +896,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
                 executionAttemptID);
 
         final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-        if (advanceToEndOfEventTime
+        if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
                 && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
             throw new IllegalArgumentException(
                     "Only synchronous savepoints are allowed to advance the watermark to MAX.");
@@ -905,8 +905,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
         final Task task = taskSlotTable.getTask(executionAttemptID);
 
         if (task != null) {
-            task.triggerCheckpointBarrier(
-                    checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
+            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
 
             return CompletableFuture.completedFuture(Acknowledge.get());
         } else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 8e59cee..829018d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -133,16 +133,13 @@ public interface TaskExecutorGateway extends RpcGateway, TaskExecutorOperatorEve
      * @param checkpointID unique id for the checkpoint
      * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
      * @param checkpointOptions for performing the checkpoint
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
      * @return Future acknowledge if the checkpoint has been successfully triggered
      */
     CompletableFuture<Acknowledge> triggerCheckpoint(
             ExecutionAttemptID executionAttemptID,
             long checkpointID,
             long checkpointTimestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime);
+            CheckpointOptions checkpointOptions);
 
     /**
      * Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dbfdba5..d48a750 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1240,14 +1240,11 @@ public class Task
      * @param checkpointID The ID identifying the checkpoint.
      * @param checkpointTimestamp The timestamp associated with the checkpoint.
      * @param checkpointOptions Options for performing this checkpoint.
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers.
      */
     public void triggerCheckpointBarrier(
             final long checkpointID,
             final long checkpointTimestamp,
-            final CheckpointOptions checkpointOptions,
-            final boolean advanceToEndOfEventTime) {
+            final CheckpointOptions checkpointOptions) {
 
         final AbstractInvokable invokable = this.invokable;
         final CheckpointMetaData checkpointMetaData =
@@ -1255,8 +1252,7 @@ public class Task
 
         if (executionState == ExecutionState.RUNNING && invokable != null) {
             try {
-                invokable.triggerCheckpointAsync(
-                        checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+                invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
             } catch (RejectedExecutionException ex) {
                 // This may happen if the mailbox is closed. It means that the task is shutting
                 // down, so we just ignore it.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 7a54ae9..3aea713 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -135,15 +135,14 @@ public interface RestfulGateway extends RpcGateway {
      * @param jobId ID of the job for which the savepoint should be triggered.
      * @param targetDirectory to which to write the savepoint data or null if the default savepoint
      *     directory should be used
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
+     * @param terminate flag indicating if the job should terminate or just suspend
      * @param timeout for the rpc call
      * @return Future which is completed with the savepoint path once completed
      */
     default CompletableFuture<String> stopWithSavepoint(
             final JobID jobId,
             final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
+            final boolean terminate,
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6379c8b..b2aa6db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2694,8 +2694,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                             null,
-                            true,
-                            false);
+                            true);
             manuallyTriggeredScheduledExecutor.triggerAll();
             try {
                 onCompletionPromise.get();
@@ -2789,8 +2788,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 jid,
                                 checkpointId,
                                 timestamp,
-                                checkpointOptions,
-                                advanceToEndOfEventTime) -> {});
+                                checkpointOptions) -> {});
         ExecutionVertex vertex2 =
                 mockExecutionVertex(
                         attemptID2,
@@ -2798,8 +2796,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 jid,
                                 checkpointId,
                                 timestamp,
-                                checkpointOptions,
-                                advanceToEndOfEventTime) -> {});
+                                checkpointOptions) -> {});
 
         OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
         OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
@@ -2953,8 +2950,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 jid,
                                 checkpointId,
                                 timestamp,
-                                checkpointOptions,
-                                advanceToEndOfEventTime) -> {});
+                                checkpointOptions) -> {});
         ExecutionVertex vertex2 =
                 mockExecutionVertex(
                         attemptID2,
@@ -2962,8 +2958,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 jid,
                                 checkpointId,
                                 timestamp,
-                                checkpointOptions,
-                                advanceToEndOfEventTime) -> {});
+                                checkpointOptions) -> {});
 
         OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
         OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 0ae6797..b573669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -583,8 +583,7 @@ public class CheckpointCoordinatorTestingUtils {
                                     jobId,
                                     (long) args[0],
                                     (long) args[1],
-                                    (CheckpointOptions) args[2],
-                                    false);
+                                    (CheckpointOptions) args[2]);
                             return null;
                         })
                 .when(mock)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 5509e71..db9f6b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -291,7 +291,6 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         null,
-                        false,
                         false);
         manuallyTriggeredScheduledExecutor.triggerAll();
         assertFalse(onCompletionPromise2.isCompletedExceptionally());
@@ -326,12 +325,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         final ExecutionAttemptID attemptID = new ExecutionAttemptID();
         final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
         final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) ->
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
                         taskManagerCheckpointTriggeredTimes.incrementAndGet();
         ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
 
@@ -365,12 +359,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         final ExecutionAttemptID attemptID = new ExecutionAttemptID();
         final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
         final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) ->
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
                         taskManagerCheckpointTriggeredTimes.incrementAndGet();
         ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
 
@@ -415,12 +404,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         final ExecutionAttemptID attemptID = new ExecutionAttemptID();
         final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
         final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) ->
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
                         taskManagerCheckpointTriggeredTimes.incrementAndGet();
         ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
 
@@ -513,12 +497,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         final ExecutionAttemptID attemptID = new ExecutionAttemptID();
         final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
         final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) ->
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
                         taskManagerCheckpointTriggeredTimes.incrementAndGet();
         ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
 
@@ -625,8 +604,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                 CheckpointProperties.forCheckpoint(
                         CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                 null,
-                true,
-                false);
+                true);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint(
@@ -636,7 +614,6 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                 CheckpointProperties.forCheckpoint(
                         CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                 null,
-                false,
                 false);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index c380835..d34d798 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -275,14 +275,11 @@ public class CheckpointRequestDeciderTest {
 
     private static CheckpointTriggerRequest savepointRequest(boolean force, boolean periodic) {
         return new CheckpointTriggerRequest(
-                CheckpointProperties.forSavepoint(force), null, periodic, false);
+                CheckpointProperties.forSavepoint(force), null, periodic);
     }
 
     private static CheckpointTriggerRequest checkpointRequest(boolean periodic) {
         return new CheckpointTriggerRequest(
-                CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
-                null,
-                periodic,
-                false);
+                CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION), null, periodic);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
index 227f262..481559b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -36,6 +36,6 @@ public class CheckpointTypeTest {
     public void testOrdinalsAreConstant() {
         assertEquals(0, CheckpointType.CHECKPOINT.ordinal());
         assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
-        assertEquals(2, CheckpointType.SYNC_SAVEPOINT.ordinal());
+        assertEquals(2, CheckpointType.SAVEPOINT_SUSPEND.ordinal());
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 339cca9..9635da3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -129,7 +129,7 @@ public class PendingCheckpointTest {
     @Test
     public void testSyncSavepointCannotBeSubsumed() throws Exception {
         // Forced checkpoints cannot be subsumed
-        CheckpointProperties forced = CheckpointProperties.forSyncSavepoint(true);
+        CheckpointProperties forced = CheckpointProperties.forSyncSavepoint(true, false);
         PendingCheckpoint pending = createPendingCheckpoint(forced);
         assertFalse(pending.canBeSubsumed());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 9e8903d..e16c209 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -61,12 +61,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
             (ignore1, ignore2) -> {};
 
     private CheckpointConsumer checkpointConsumer =
-            (executionAttemptID,
-                    jobId,
-                    checkpointId,
-                    timestamp,
-                    checkpointOptions,
-                    advanceToEndOfEventTime) -> {};
+            (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> {};
 
     public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) {
         this.submitConsumer = submitConsumer;
@@ -147,16 +142,10 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
             JobID jobId,
             long checkpointId,
             long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointOptions checkpointOptions) {
 
         checkpointConsumer.accept(
-                executionAttemptID,
-                jobId,
-                checkpointId,
-                timestamp,
-                checkpointOptions,
-                advanceToEndOfEventTime);
+                executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions);
     }
 
     @Override
@@ -187,7 +176,6 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
                 JobID jobId,
                 long checkpointId,
                 long timestamp,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime);
+                CheckpointOptions checkpointOptions);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
index 0c698c6..855f3e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
@@ -44,7 +44,7 @@ public class CheckpointSerializationTest {
     public void testSuspendingCheckpointBarrierSerialization() throws Exception {
         CheckpointOptions suspendSavepointToSerialize =
                 new CheckpointOptions(
-                        CheckpointType.SYNC_SAVEPOINT,
+                        CheckpointType.SAVEPOINT_SUSPEND,
                         new CheckpointStorageLocationReference(STORAGE_LOCATION_REF));
         testCheckpointBarrierSerialization(suspendSavepointToSerialize);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 842e7ce..6cf6b76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 
 import org.junit.Test;
 
@@ -47,7 +49,28 @@ public class EventSerializerTest {
         EndOfPartitionEvent.INSTANCE,
         EndOfSuperstepEvent.INSTANCE,
         new CheckpointBarrier(
-                1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()),
+                1678L,
+                4623784L,
+                new CheckpointOptions(
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault())),
+        new CheckpointBarrier(
+                1678L,
+                4623784L,
+                new CheckpointOptions(
+                        CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault())),
+        new CheckpointBarrier(
+                1678L,
+                4623784L,
+                new CheckpointOptions(
+                        CheckpointType.SAVEPOINT_SUSPEND,
+                        CheckpointStorageLocationReference.getDefault())),
+        new CheckpointBarrier(
+                1678L,
+                4623784L,
+                new CheckpointOptions(
+                        CheckpointType.SAVEPOINT_TERMINATE,
+                        CheckpointStorageLocationReference.getDefault())),
         new TestTaskEvent(Math.random(), 12361231273L),
         new CancelCheckpointMarker(287087987329842L)
     };
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 2ca5f9d..3f6e6fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -400,10 +400,8 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
-            final Time timeout) {
-        return stopWithSavepointFunction.apply(targetDirectory, advanceToEndOfEventTime);
+            @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
+        return stopWithSavepointFunction.apply(targetDirectory, terminate);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index c1732ec..969368a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -425,9 +425,7 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             actions.add(checkpointMetaData); // this signals the main thread should do a checkpoint
             return CompletableFuture.completedFuture(true);
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 26fa24d..08fa081 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -919,12 +919,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 .getLogicalSlotBuilder()
                 .setTaskManagerGateway(taskManagerGateway);
         taskManagerGateway.setCheckpointConsumer(
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) -> {
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> {
                     checkpointTriggeredLatch.countDown();
                 });
         return checkpointTriggeredLatch;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 67cbada..ab07a1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -203,8 +203,7 @@ public class TestingSchedulerNG implements SchedulerNG {
     }
 
     @Override
-    public CompletableFuture<String> stopWithSavepoint(
-            String targetDirectory, boolean advanceToEndOfEventTime) {
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
         failOperation();
         return null;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 9c37d1f..062a324 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -212,8 +212,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
             ExecutionAttemptID executionAttemptID,
             long checkpointID,
             long checkpointTimestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointOptions checkpointOptions) {
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 94044a0..d769a5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -83,7 +83,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
     /**
      * Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpointAsync(CheckpointMetaData,
-     * CheckpointOptions, boolean)} was called {@link #numCalls} times.
+     * CheckpointOptions)} was called {@link #numCalls} times.
      */
     private static OneShotLatch triggerLatch;
 
@@ -121,10 +121,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
             for (int i = 1; i <= numCalls; i++) {
                 task.triggerCheckpointBarrier(
-                        i,
-                        156865867234L,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false);
+                        i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
             }
 
             triggerLatch.await();
@@ -147,10 +144,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
             for (int i = 1; i <= numCalls; i++) {
                 task.triggerCheckpointBarrier(
-                        i,
-                        156865867234L,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false);
+                        i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
                 task.notifyCheckpointComplete(i);
             }
 
@@ -260,9 +254,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             lastCheckpointId++;
             if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
                 if (lastCheckpointId == numCalls) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 8c83ff5..7ff7ac8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -264,7 +264,7 @@ public class TestingRestfulGateway implements RestfulGateway {
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            JobID jobId, String targetDirectory, boolean advanceToEndOfTime, Time timeout) {
+            JobID jobId, String targetDirectory, boolean terminate, Time timeout) {
         return stopWithSavepointFunction.apply(jobId, targetDirectory);
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 934715b..5532eff 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -220,8 +220,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
         task.triggerCheckpointAsync(
                         new CheckpointMetaData(42, 17),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         testHarness.processElement(new StreamRecord<>("Wohoo", 0));
@@ -342,8 +341,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
         task.triggerCheckpointAsync(
                         new CheckpointMetaData(42, 17),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         testHarness.processElement(new StreamRecord<>("Wohoo", 0));
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 54ffac2..8181e02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -91,12 +91,9 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
 
     @Override
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         if (!isExternallyInducedSource) {
-            return super.triggerCheckpointAsync(
-                    checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
         } else {
             return CompletableFuture.completedFuture(isRunning());
         }
@@ -127,7 +124,7 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
         final CheckpointMetaData checkpointMetaData =
                 new CheckpointMetaData(checkpointId, timestamp);
 
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false);
+        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
     }
 
     // ---------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index db1fba9..42b1ccf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -111,7 +111,7 @@ public class SourceStreamTask<
                             try {
                                 SourceStreamTask.super
                                         .triggerCheckpointAsync(
-                                                checkpointMetaData, checkpointOptions, false)
+                                                checkpointMetaData, checkpointOptions)
                                         .get();
                             } catch (RuntimeException e) {
                                 throw e;
@@ -217,12 +217,9 @@ public class SourceStreamTask<
 
     @Override
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         if (!externallyInducedCheckpoints) {
-            return super.triggerCheckpointAsync(
-                    checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
         } else {
             // we do not trigger checkpoints here, we simply state whether we can trigger them
             synchronized (lock) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 32b8612..573bd31 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -850,9 +850,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
 
     @Override
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
 
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         mainMailboxExecutor.execute(
@@ -864,11 +862,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                                             System.currentTimeMillis()
                                                     - checkpointMetaData.getTimestamp());
                     try {
-                        result.complete(
-                                triggerCheckpoint(
-                                        checkpointMetaData,
-                                        checkpointOptions,
-                                        advanceToEndOfEventTime));
+                        result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions));
                     } catch (Exception ex) {
                         // Report the failure both via the Future result but also to the mailbox
                         result.completeExceptionally(ex);
@@ -882,9 +876,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     }
 
     private boolean triggerCheckpoint(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime)
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
             throws Exception {
         try {
             // No alignment if we inject a checkpoint
@@ -895,11 +887,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                     checkpointMetaData.getCheckpointId(), checkpointOptions);
 
             boolean success =
-                    performCheckpoint(
-                            checkpointMetaData,
-                            checkpointOptions,
-                            checkpointMetrics,
-                            advanceToEndOfEventTime);
+                    performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
             if (!success) {
                 declineCheckpoint(checkpointMetaData.getCheckpointId());
             }
@@ -945,8 +933,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
             throws IOException {
 
         try {
-            if (performCheckpoint(
-                    checkpointMetaData, checkpointOptions, checkpointMetrics, false)) {
+            if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics)) {
                 if (isSynchronousSavepointId(checkpointMetaData.getCheckpointId())) {
                     runSynchronousSavepointMailboxLoop();
                 }
@@ -977,8 +964,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     private boolean performCheckpoint(
             CheckpointMetaData checkpointMetaData,
             CheckpointOptions checkpointOptions,
-            CheckpointMetrics checkpointMetrics,
-            boolean advanceToEndOfTime)
+            CheckpointMetrics checkpointMetrics)
             throws Exception {
 
         LOG.debug(
@@ -993,7 +979,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                         if (checkpointOptions.getCheckpointType().isSynchronous()) {
                             setSynchronousSavepointId(checkpointMetaData.getCheckpointId());
 
-                            if (advanceToEndOfTime) {
+                            if (checkpointOptions.getCheckpointType().shouldAdvanceToEndOfTime()) {
                                 advanceToEndOfEventTime();
                             }
                         } else if (activeSyncSavepointId != null
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 904654d..9e15fe2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -283,8 +283,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                                                             new CheckpointMetaData(
                                                                     0, System.currentTimeMillis()),
                                                             CheckpointOptions
-                                                                    .forCheckpointWithDefaultLocation(),
-                                                            false)
+                                                                    .forCheckpointWithDefaultLocation())
                                                     .get()) {
                                         LifecycleTrackingStreamSource.runFinish.trigger();
                                     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 5af7bc9..19184ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -526,7 +526,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                 new CheckpointMetaData(checkpointId, checkpointTimestamp);
 
         task.triggerCheckpointAsync(
-                checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+                checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
 
         taskStateManagerMock.getWaitForReportLatch().await();
 
@@ -569,8 +569,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
         restoredTask
                 .triggerCheckpointAsync(
                         new CheckpointMetaData(checkpointId, checkpointTimestamp),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
index 33ad7d0..6e939dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
@@ -1108,9 +1108,7 @@ public abstract class CheckpointBarrierAlignerTestBase {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             throw new UnsupportedOperationException("should never be called");
         }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
index 180ce14..44dbe29 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
@@ -49,9 +49,7 @@ class CheckpointSequenceValidator extends AbstractInvokable {
 
     @Override
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         throw new UnsupportedOperationException("should never be called");
     }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index d935af9..1c9c270 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -607,9 +607,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
         streamTask
                 .triggerCheckpointAsync(
-                        checkpointMetaData,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         // since no state was set, there shouldn't be restore calls
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
index ae3d14d..14c25d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
@@ -285,7 +285,7 @@ public class RestoreStreamTaskTest extends TestLogger {
         testHarness.taskStateManager.setWaitForReportLatch(new OneShotLatch());
 
         streamTask.triggerCheckpointAsync(
-                checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+                checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
 
         testHarness.taskStateManager.getWaitForReportLatch().await();
         long reportedCheckpointId = testHarness.taskStateManager.getReportedCheckpointId();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
index 303cf37..7308e26 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
@@ -89,8 +89,7 @@ public class SourceExternalCheckpointTriggerTest {
                 sourceTask
                         .triggerCheckpointAsync(
                                 new CheckpointMetaData(32, 829),
-                                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                                false)
+                                CheckpointOptions.forCheckpointWithDefaultLocation())
                         .get());
 
         // step by step let the source thread emit elements
@@ -111,8 +110,7 @@ public class SourceExternalCheckpointTriggerTest {
                 sourceTask
                         .triggerCheckpointAsync(
                                 new CheckpointMetaData(34, 900),
-                                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                                false)
+                                CheckpointOptions.forCheckpointWithDefaultLocation())
                         .get());
 
         sync.trigger();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 123fcca..b5edd92 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -98,7 +98,7 @@ public class SourceOperatorStreamTaskTest {
 
             final CheckpointOptions checkpointOptions =
                     new CheckpointOptions(
-                            CheckpointType.SYNC_SAVEPOINT,
+                            CheckpointType.SAVEPOINT_TERMINATE,
                             CheckpointStorageLocationReference.getDefault());
             triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions);
 
@@ -202,7 +202,7 @@ public class SourceOperatorStreamTaskTest {
         Future<Boolean> checkpointFuture =
                 testHarness
                         .getStreamTask()
-                        .triggerCheckpointAsync(checkpointMetaData, checkpointOptions, true);
+                        .triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
 
         // Wait until the checkpoint finishes.
         // We have to mark the source reader as available here, otherwise the runMailboxStep() call
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index ebafdbf..8ba4739 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -76,7 +76,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT_SUSPEND;
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -115,8 +115,7 @@ public class SourceStreamTaskTest {
         Future<Boolean> triggerFuture =
                 harness.streamTask.triggerCheckpointAsync(
                         new CheckpointMetaData(1, 1),
-                        new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
-                        false);
+                        new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()));
         while (!triggerFuture.isDone()) {
             harness.streamTask.runMailboxStep();
         }
@@ -175,8 +174,7 @@ public class SourceStreamTaskTest {
         Future<Boolean> triggerFuture =
                 harness.streamTask.triggerCheckpointAsync(
                         new CheckpointMetaData(1L, System.currentTimeMillis()),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false);
+                        CheckpointOptions.forCheckpointWithDefaultLocation());
 
         assertFalse(triggerFuture.isDone());
         Thread.sleep(sleepTime);
@@ -590,8 +588,7 @@ public class SourceStreamTaskTest {
             Future<Boolean> triggerFuture =
                     harness.streamTask.triggerCheckpointAsync(
                             new CheckpointMetaData(checkpointId, 1),
-                            new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
-                            false);
+                            new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()));
             while (!triggerFuture.isDone()) {
                 harness.streamTask.runMailboxStep();
             }
@@ -711,8 +708,7 @@ public class SourceStreamTaskTest {
                 try {
                     sourceTask.triggerCheckpointAsync(
                             new CheckpointMetaData(currentCheckpointId, 0L),
-                            CheckpointOptions.forCheckpointWithDefaultLocation(),
-                            false);
+                            CheckpointOptions.forCheckpointWithDefaultLocation());
                 } catch (RejectedExecutionException e) {
                     // We are late with a checkpoint, the mailbox is already closed.
                     return false;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
index 43df989..9aa0fcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
@@ -74,7 +74,7 @@ public class SourceTaskTerminationTest extends TestLogger {
         stopWithSavepointStreamTaskTestHelper(false);
     }
 
-    private void stopWithSavepointStreamTaskTestHelper(final boolean withMaxWatermark)
+    private void stopWithSavepointStreamTaskTestHelper(final boolean shouldTerminate)
             throws Exception {
         final long syncSavepointId = 34L;
 
@@ -90,8 +90,7 @@ public class SourceTaskTerminationTest extends TestLogger {
 
         srcTask.triggerCheckpointAsync(
                         new CheckpointMetaData(31L, 900),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
@@ -101,12 +100,13 @@ public class SourceTaskTerminationTest extends TestLogger {
         srcTask.triggerCheckpointAsync(
                         new CheckpointMetaData(syncSavepointId, 900),
                         new CheckpointOptions(
-                                CheckpointType.SYNC_SAVEPOINT,
-                                CheckpointStorageLocationReference.getDefault()),
-                        withMaxWatermark)
+                                shouldTerminate
+                                        ? CheckpointType.SAVEPOINT_TERMINATE
+                                        : CheckpointType.SAVEPOINT_SUSPEND,
+                                CheckpointStorageLocationReference.getDefault()))
                 .get();
 
-        if (withMaxWatermark) {
+        if (shouldTerminate) {
             // if we are in TERMINATE mode, we expect the source task
             // to emit MAX_WM before the SYNC_SAVEPOINT barrier.
             verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
index 1d7f33e..17b604d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
@@ -74,8 +74,7 @@ public class StreamTaskExecutionDecorationTest {
                 new CheckpointMetaData(1, 2),
                 new CheckpointOptions(
                         CheckpointType.CHECKPOINT,
-                        new CheckpointStorageLocationReference(new byte[] {1})),
-                false);
+                        new CheckpointStorageLocationReference(new byte[] {1})));
         Assert.assertTrue("mailbox is empty", mailbox.hasMail());
         Assert.assertFalse("execution decorator was called preliminary", decorator.wasCalled());
         mailbox.drain()
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 12f3676..235cdef 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -199,8 +199,7 @@ public class StreamTaskTerminationTest extends TestLogger {
         task.triggerCheckpointBarrier(
                 checkpointId,
                 checkpointTimestamp,
-                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                false);
+                CheckpointOptions.forCheckpointWithDefaultLocation());
 
         // wait until the task has completed execution
         taskRun.get();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bfd7d54..904097e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -154,7 +154,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT_SUSPEND;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
@@ -237,7 +237,7 @@ public class StreamTaskTest extends TestLogger {
                     try {
                         harness.streamTask.triggerCheckpointOnBarrier(
                                 new CheckpointMetaData(checkpointId, checkpointId),
-                                new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                                new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()),
                                 new CheckpointMetrics());
                     } catch (IOException e) {
                         fail(e.getMessage());
@@ -649,8 +649,7 @@ public class StreamTaskTest extends TestLogger {
 
         streamTask.triggerCheckpointAsync(
                 new CheckpointMetaData(42L, 1L),
-                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                false);
+                CheckpointOptions.forCheckpointWithDefaultLocation());
 
         try {
             task.waitForTaskCompletion(false);
@@ -701,8 +700,7 @@ public class StreamTaskTest extends TestLogger {
 
         streamTask.triggerCheckpointAsync(
                 new CheckpointMetaData(42L, 1L),
-                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                false);
+                CheckpointOptions.forCheckpointWithDefaultLocation());
 
         final Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException();
         assertThat(uncaughtException, is(failingCause));
@@ -752,8 +750,7 @@ public class StreamTaskTest extends TestLogger {
             streamTask
                     .triggerCheckpointAsync(
                             new CheckpointMetaData(42L, 1L),
-                            CheckpointOptions.forCheckpointWithDefaultLocation(),
-                            false)
+                            CheckpointOptions.forCheckpointWithDefaultLocation())
                     .get();
 
             // wait for the completion of the async task
@@ -868,8 +865,7 @@ public class StreamTaskTest extends TestLogger {
             final long checkpointId = 42L;
             streamTask.triggerCheckpointAsync(
                     new CheckpointMetaData(checkpointId, 1L),
-                    CheckpointOptions.forCheckpointWithDefaultLocation(),
-                    false);
+                    CheckpointOptions.forCheckpointWithDefaultLocation());
 
             acknowledgeCheckpointLatch.await();
 
@@ -958,8 +954,7 @@ public class StreamTaskTest extends TestLogger {
         final long checkpointId = 42L;
         task.streamTask.triggerCheckpointAsync(
                 new CheckpointMetaData(checkpointId, 1L),
-                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                false);
+                CheckpointOptions.forCheckpointWithDefaultLocation());
 
         rawKeyedStateHandleFuture.awaitRun();
 
@@ -1044,8 +1039,7 @@ public class StreamTaskTest extends TestLogger {
 
             task.streamTask.triggerCheckpointAsync(
                     new CheckpointMetaData(42L, 1L),
-                    CheckpointOptions.forCheckpointWithDefaultLocation(),
-                    false);
+                    CheckpointOptions.forCheckpointWithDefaultLocation());
 
             checkpointCompletedLatch.await(30, TimeUnit.SECONDS);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index ba122d5..9b5452f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -108,9 +108,8 @@ public class SynchronousCheckpointITCase {
                     42,
                     156865867234L,
                     new CheckpointOptions(
-                            CheckpointType.SYNC_SAVEPOINT,
-                            CheckpointStorageLocationReference.getDefault()),
-                    true);
+                            CheckpointType.SAVEPOINT_SUSPEND,
+                            CheckpointStorageLocationReference.getDefault()));
 
             assertThat(eventQueue.take(), is(Event.PRE_TRIGGER_CHECKPOINT));
             assertThat(eventQueue.take(), is(Event.POST_TRIGGER_CHECKPOINT));
@@ -153,14 +152,11 @@ public class SynchronousCheckpointITCase {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             try {
                 eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT);
                 Future<Boolean> result =
-                        super.triggerCheckpointAsync(
-                                checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+                        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
                 eventQueue.put(Event.POST_TRIGGER_CHECKPOINT);
                 return result;
             } catch (InterruptedException e) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 5d75d38..0a4949c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -111,9 +111,8 @@ public class SynchronousCheckpointTest {
         streamTaskUnderTest.triggerCheckpointAsync(
                 new CheckpointMetaData(42, System.currentTimeMillis()),
                 new CheckpointOptions(
-                        CheckpointType.SYNC_SAVEPOINT,
-                        CheckpointStorageLocationReference.getDefault()),
-                false);
+                        CheckpointType.SAVEPOINT_SUSPEND,
+                        CheckpointStorageLocationReference.getDefault()));
         waitForSyncSavepointIdToBeSet(streamTaskUnderTest);
     }
 
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 53a0612..f72d53a 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -351,19 +351,16 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             final long checkpointId = checkpointMetaData.getCheckpointId();
             final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
 
-            if (checkpointType == CheckpointType.SYNC_SAVEPOINT) {
+            if (checkpointType == CheckpointType.SAVEPOINT_SUSPEND) {
                 synchronousSavepointId = checkpointId;
                 syncSavepointId.compareAndSet(-1, synchronousSavepointId);
             }
 
-            return super.triggerCheckpointAsync(
-                    checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
         }
 
         @Override
@@ -435,8 +432,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
         @Override
         public Future<Boolean> triggerCheckpointAsync(
                 final CheckpointMetaData checkpointMetaData,
-                final CheckpointOptions checkpointOptions,
-                final boolean advanceToEndOfEventTime) {
+                final CheckpointOptions checkpointOptions) {
             final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
             if (taskIndex == 0) {
                 checkpointsToWaitFor.countDown();
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index ecb9d5e..cd79163 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -239,8 +239,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
         @Override
         public Future<Boolean> triggerCheckpointAsync(
                 final CheckpointMetaData checkpointMetaData,
-                final CheckpointOptions checkpointOptions,
-                final boolean advanceToEndOfEventTime) {
+                final CheckpointOptions checkpointOptions) {
             final TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
             checkpointStateHandles.putSubtaskStateByOperatorID(
                     OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
index dd282e7..95886dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
@@ -183,9 +183,7 @@ public class StatefulOperatorChainedTaskTest {
 
         while (!streamTask
                 .triggerCheckpointAsync(
-                        checkpointMetaData,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get()) {}
 
         testHarness.getTaskStateManager().getWaitForReportLatch().await();