You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/25 10:31:23 UTC

[flink] 02/03: [FLINK-21453][checkpointing][refactor] Replace advanceToEndOfTime with new CheckpointType.SAVEPOINT_TERMINATE

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cb2154d7febe419d4d4b80900984c4132a90194
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Feb 23 20:34:00 2021 +0100

    [FLINK-21453][checkpointing][refactor] Replace advanceToEndOfTime with new CheckpointType.SAVEPOINT_TERMINATE
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 44 ++++++++--------------
 .../runtime/checkpoint/CheckpointProperties.java   | 19 +++++-----
 .../flink/runtime/checkpoint/CheckpointType.java   | 34 +++++++++++++----
 .../flink/runtime/dispatcher/Dispatcher.java       |  5 +--
 .../flink/runtime/executiongraph/Execution.java    | 27 ++++---------
 .../network/api/serialization/EventSerializer.java | 16 +++++---
 .../runtime/jobgraph/tasks/AbstractInvokable.java  |  6 +--
 .../jobmanager/slots/TaskManagerGateway.java       |  5 +--
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  6 +--
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  5 +--
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  9 +----
 .../flink/runtime/minicluster/MiniCluster.java     |  4 +-
 .../handler/job/savepoints/SavepointHandlers.java  |  4 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |  4 +-
 .../flink/runtime/scheduler/SchedulerNG.java       |  3 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  9 ++---
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  5 +--
 .../org/apache/flink/runtime/taskmanager/Task.java |  8 +---
 .../flink/runtime/webmonitor/RestfulGateway.java   |  5 +--
 .../checkpoint/CheckpointCoordinatorTest.java      | 15 +++-----
 .../CheckpointCoordinatorTestingUtils.java         |  3 +-
 .../CheckpointCoordinatorTriggeringTest.java       | 33 +++-------------
 .../checkpoint/CheckpointRequestDeciderTest.java   |  7 +---
 .../runtime/checkpoint/CheckpointTypeTest.java     |  2 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |  2 +-
 .../utils/SimpleAckingTaskManagerGateway.java      | 20 ++--------
 .../serialization/CheckpointSerializationTest.java |  2 +-
 .../api/serialization/EventSerializerTest.java     | 25 +++++++++++-
 .../jobmaster/utils/TestingJobMasterGateway.java   |  6 +--
 .../CoordinatorEventsExactlyOnceITCase.java        |  4 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  7 +---
 .../runtime/scheduler/TestingSchedulerNG.java      |  3 +-
 .../taskexecutor/TestingTaskExecutorGateway.java   |  3 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java     | 16 ++------
 .../runtime/webmonitor/TestingRestfulGateway.java  |  2 +-
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |  6 +--
 .../runtime/tasks/SourceOperatorStreamTask.java    |  9 ++---
 .../streaming/runtime/tasks/SourceStreamTask.java  |  9 ++---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 28 ++++----------
 .../AbstractUdfStreamOperatorLifecycleTest.java    |  3 +-
 .../api/operators/async/AsyncWaitOperatorTest.java |  5 +--
 .../io/CheckpointBarrierAlignerTestBase.java       |  4 +-
 .../runtime/io/CheckpointSequenceValidator.java    |  4 +-
 .../runtime/tasks/OneInputStreamTaskTest.java      |  4 +-
 .../runtime/tasks/RestoreStreamTaskTest.java       |  2 +-
 .../tasks/SourceExternalCheckpointTriggerTest.java |  6 +--
 .../tasks/SourceOperatorStreamTaskTest.java        |  4 +-
 .../runtime/tasks/SourceStreamTaskTest.java        | 14 +++----
 .../runtime/tasks/SourceTaskTerminationTest.java   | 14 +++----
 .../tasks/StreamTaskExecutionDecorationTest.java   |  3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 22 ++++-------
 .../runtime/tasks/SynchronousCheckpointITCase.java | 12 ++----
 .../runtime/tasks/SynchronousCheckpointTest.java   |  5 +--
 .../jobmaster/JobMasterStopWithSavepointIT.java    | 12 ++----
 .../jobmaster/JobMasterTriggerSavepointITCase.java |  3 +-
 .../state/StatefulOperatorChainedTaskTest.java     |  4 +-
 57 files changed, 210 insertions(+), 334 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 46da0d2..e0645fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -441,14 +442,13 @@ public class CheckpointCoordinator {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, false, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation);
     }
 
     /**
      * Triggers a synchronous savepoint with the given savepoint directory as a target.
      *
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers.
+     * @param terminate flag indicating if the job should terminate or just suspend
      * @param targetLocation Target location for the savepoint, optional. If null, the state
      *     backend's configured default will be used.
      * @return A future to the completed checkpoint
@@ -456,17 +456,16 @@ public class CheckpointCoordinator {
      *     savepoint directory has been configured
      */
     public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
-            final boolean advanceToEndOfEventTime, @Nullable final String targetLocation) {
+            final boolean terminate, @Nullable final String targetLocation) {
 
         final CheckpointProperties properties =
-                CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled);
+                CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, advanceToEndOfEventTime, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
             final CheckpointProperties checkpointProperties,
-            final boolean advanceToEndOfEventTime,
             @Nullable final String targetLocation) {
 
         checkNotNull(checkpointProperties);
@@ -476,11 +475,7 @@ public class CheckpointCoordinator {
         final CompletableFuture<CompletedCheckpoint> resultFuture = new CompletableFuture<>();
         timer.execute(
                 () ->
-                        triggerCheckpoint(
-                                        checkpointProperties,
-                                        targetLocation,
-                                        false,
-                                        advanceToEndOfEventTime)
+                        triggerCheckpoint(checkpointProperties, targetLocation, false)
                                 .whenComplete(
                                         (completedCheckpoint, throwable) -> {
                                             if (throwable == null) {
@@ -502,25 +497,24 @@ public class CheckpointCoordinator {
      * @return a future to the completed checkpoint.
      */
     public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic) {
-        return triggerCheckpoint(checkpointProperties, null, isPeriodic, false);
+        return triggerCheckpoint(checkpointProperties, null, isPeriodic);
     }
 
     @VisibleForTesting
     public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
             CheckpointProperties props,
             @Nullable String externalSavepointLocation,
-            boolean isPeriodic,
-            boolean advanceToEndOfTime) {
+            boolean isPeriodic) {
 
-        if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
+        if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE
+                && !(props.isSynchronous() && props.isSavepoint())) {
             return FutureUtils.completedExceptionally(
                     new IllegalArgumentException(
                             "Only synchronous savepoints are allowed to advance the watermark to MAX."));
         }
 
         CheckpointTriggerRequest request =
-                new CheckpointTriggerRequest(
-                        props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
+                new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic);
         chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
         return request.onCompletionPromise;
     }
@@ -619,8 +613,7 @@ public class CheckpointCoordinator {
                                                         checkpointId,
                                                         checkpoint.getCheckpointStorageLocation(),
                                                         request.props,
-                                                        executions,
-                                                        request.advanceToEndOfTime);
+                                                        executions);
 
                                                 coordinatorsToCheckpoint.forEach(
                                                         (ctx) ->
@@ -820,8 +813,7 @@ public class CheckpointCoordinator {
             long checkpointID,
             CheckpointStorageLocation checkpointStorageLocation,
             CheckpointProperties props,
-            Execution[] executions,
-            boolean advanceToEndOfTime) {
+            Execution[] executions) {
 
         final CheckpointOptions checkpointOptions =
                 new CheckpointOptions(
@@ -834,8 +826,7 @@ public class CheckpointCoordinator {
         // send the messages to the tasks that trigger their checkpoint
         for (Execution execution : executions) {
             if (props.isSynchronous()) {
-                execution.triggerSynchronousSavepoint(
-                        checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
+                execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions);
             } else {
                 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
             }
@@ -2037,21 +2028,18 @@ public class CheckpointCoordinator {
         final CheckpointProperties props;
         final @Nullable String externalSavepointLocation;
         final boolean isPeriodic;
-        final boolean advanceToEndOfTime;
         private final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
                 new CompletableFuture<>();
 
         CheckpointTriggerRequest(
                 CheckpointProperties props,
                 @Nullable String externalSavepointLocation,
-                boolean isPeriodic,
-                boolean advanceToEndOfTime) {
+                boolean isPeriodic) {
 
             this.timestamp = System.currentTimeMillis();
             this.props = checkNotNull(props);
             this.externalSavepointLocation = externalSavepointLocation;
             this.isPeriodic = isPeriodic;
-            this.advanceToEndOfTime = advanceToEndOfTime;
         }
 
         CompletableFuture<CompletedCheckpoint> getOnCompletionFuture() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 96162a2..91c9374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -238,14 +238,6 @@ public class CheckpointProperties implements Serializable {
     //  Factories and pre-configured properties
     // ------------------------------------------------------------------------
 
-    private static final CheckpointProperties SYNC_SAVEPOINT =
-            new CheckpointProperties(
-                    true, CheckpointType.SYNC_SAVEPOINT, false, false, false, false, false);
-
-    private static final CheckpointProperties SYNC_SAVEPOINT_NOT_FORCED =
-            new CheckpointProperties(
-                    false, CheckpointType.SYNC_SAVEPOINT, false, false, false, false, false);
-
     private static final CheckpointProperties SAVEPOINT =
             new CheckpointProperties(
                     true, CheckpointType.SAVEPOINT, false, false, false, false, false);
@@ -296,8 +288,15 @@ public class CheckpointProperties implements Serializable {
         return forced ? SAVEPOINT : SAVEPOINT_NO_FORCE;
     }
 
-    public static CheckpointProperties forSyncSavepoint(boolean forced) {
-        return forced ? SYNC_SAVEPOINT : SYNC_SAVEPOINT_NOT_FORCED;
+    public static CheckpointProperties forSyncSavepoint(boolean forced, boolean terminate) {
+        return new CheckpointProperties(
+                forced,
+                terminate ? CheckpointType.SAVEPOINT_TERMINATE : CheckpointType.SAVEPOINT_SUSPEND,
+                false,
+                false,
+                false,
+                false,
+                false);
     }
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
index a1df789..02c5c1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
@@ -22,22 +22,25 @@ package org.apache.flink.runtime.checkpoint;
 public enum CheckpointType {
 
     /** A checkpoint, full or incremental. */
-    CHECKPOINT(false, false),
+    CHECKPOINT(false, PostCheckpointAction.NONE),
 
     /** A regular savepoint. */
-    SAVEPOINT(true, false),
+    SAVEPOINT(true, PostCheckpointAction.NONE),
 
-    /** A savepoint taken while suspending/terminating the job. */
-    SYNC_SAVEPOINT(true, true);
+    /** A savepoint taken while suspending the job. */
+    SAVEPOINT_SUSPEND(true, PostCheckpointAction.SUSPEND),
+
+    /** A savepoint taken while terminating the job. */
+    SAVEPOINT_TERMINATE(true, PostCheckpointAction.TERMINATE);
 
     private final boolean isSavepoint;
 
-    private final boolean isSynchronous;
+    private final PostCheckpointAction postCheckpointAction;
 
-    CheckpointType(final boolean isSavepoint, final boolean isSynchronous) {
+    CheckpointType(final boolean isSavepoint, final PostCheckpointAction postCheckpointAction) {
 
         this.isSavepoint = isSavepoint;
-        this.isSynchronous = isSynchronous;
+        this.postCheckpointAction = postCheckpointAction;
     }
 
     public boolean isSavepoint() {
@@ -45,6 +48,21 @@ public enum CheckpointType {
     }
 
     public boolean isSynchronous() {
-        return isSynchronous;
+        return postCheckpointAction != PostCheckpointAction.NONE;
+    }
+
+    public PostCheckpointAction getPostCheckpointAction() {
+        return postCheckpointAction;
+    }
+
+    public boolean shouldAdvanceToEndOfTime() {
+        return getPostCheckpointAction() == PostCheckpointAction.TERMINATE;
+    }
+
+    /** What's the intended action after the checkpoint (relevant for stopping with savepoint). */
+    public enum PostCheckpointAction {
+        NONE,
+        SUSPEND,
+        TERMINATE
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 18939bb..a586f59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -729,15 +729,14 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     public CompletableFuture<String> stopWithSavepoint(
             final JobID jobId,
             final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
+            final boolean terminate,
             final Time timeout) {
         final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
                 getJobMasterGatewayFuture(jobId);
 
         return jobMasterGatewayFuture.thenCompose(
                 (JobMasterGateway jobMasterGateway) ->
-                        jobMasterGateway.stopWithSavepoint(
-                                targetDirectory, advanceToEndOfEventTime, timeout));
+                        jobMasterGateway.stopWithSavepoint(targetDirectory, terminate, timeout));
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 50f05ac..8fa4e61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -1111,7 +1112,7 @@ public class Execution
      */
     public void triggerCheckpoint(
             long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-        triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, false);
+        triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
     }
 
     /**
@@ -1120,26 +1121,17 @@ public class Execution
      * @param checkpointId of th checkpoint to trigger
      * @param timestamp of the checkpoint to trigger
      * @param checkpointOptions of the checkpoint to trigger
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
      */
     public void triggerSynchronousSavepoint(
-            long checkpointId,
-            long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
-        triggerCheckpointHelper(
-                checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
+            long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
+        triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
     }
 
     private void triggerCheckpointHelper(
-            long checkpointId,
-            long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 
         final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-        if (advanceToEndOfEventTime
+        if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
                 && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
             throw new IllegalArgumentException(
                     "Only synchronous savepoints are allowed to advance the watermark to MAX.");
@@ -1151,12 +1143,7 @@ public class Execution
             final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
             taskManagerGateway.triggerCheckpoint(
-                    attemptId,
-                    getVertex().getJobId(),
-                    checkpointId,
-                    timestamp,
-                    checkpointOptions,
-                    advanceToEndOfEventTime);
+                    attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
         } else {
             LOG.debug(
                     "The execution has no slot assigned. This indicates that the execution is no longer running.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 210d7c7..f0a3cea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -64,7 +64,9 @@ public class EventSerializer {
 
     private static final int CHECKPOINT_TYPE_SAVEPOINT = 1;
 
-    private static final int CHECKPOINT_TYPE_SYNC_SAVEPOINT = 2;
+    private static final int CHECKPOINT_TYPE_SAVEPOINT_SUSPEND = 2;
+
+    private static final int CHECKPOINT_TYPE_SAVEPOINT_TERMINATE = 3;
 
     // ------------------------------------------------------------------------
     //  Serialization Logic
@@ -220,8 +222,10 @@ public class EventSerializer {
             typeInt = CHECKPOINT_TYPE_CHECKPOINT;
         } else if (checkpointType == CheckpointType.SAVEPOINT) {
             typeInt = CHECKPOINT_TYPE_SAVEPOINT;
-        } else if (checkpointType == CheckpointType.SYNC_SAVEPOINT) {
-            typeInt = CHECKPOINT_TYPE_SYNC_SAVEPOINT;
+        } else if (checkpointType == CheckpointType.SAVEPOINT_SUSPEND) {
+            typeInt = CHECKPOINT_TYPE_SAVEPOINT_SUSPEND;
+        } else if (checkpointType == CheckpointType.SAVEPOINT_TERMINATE) {
+            typeInt = CHECKPOINT_TYPE_SAVEPOINT_TERMINATE;
         } else {
             throw new IOException("Unknown checkpoint type: " + checkpointType);
         }
@@ -257,8 +261,10 @@ public class EventSerializer {
             checkpointType = CheckpointType.CHECKPOINT;
         } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT) {
             checkpointType = CheckpointType.SAVEPOINT;
-        } else if (checkpointTypeCode == CHECKPOINT_TYPE_SYNC_SAVEPOINT) {
-            checkpointType = CheckpointType.SYNC_SAVEPOINT;
+        } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT_SUSPEND) {
+            checkpointType = CheckpointType.SAVEPOINT_SUSPEND;
+        } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT_TERMINATE) {
+            checkpointType = CheckpointType.SAVEPOINT_TERMINATE;
         } else {
             throw new IOException("Unknown checkpoint type code: " + checkpointTypeCode);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 22fbb8a..c98ed6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -209,15 +209,11 @@ public abstract class AbstractInvokable {
      *
      * @param checkpointMetaData Meta data for about this checkpoint
      * @param checkpointOptions Options for performing this checkpoint
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
      * @return future with value of {@code false} if the checkpoint was not carried out, {@code
      *     true} otherwise
      */
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         throw new UnsupportedOperationException(
                 String.format(
                         "triggerCheckpointAsync not supported by %s", this.getClass().getName()));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index d209481..c115f2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -127,16 +127,13 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway {
      * @param checkpointId of the checkpoint to trigger
      * @param timestamp of the checkpoint to trigger
      * @param checkpointOptions of the checkpoint to trigger
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
      */
     void triggerCheckpoint(
             ExecutionAttemptID executionAttemptID,
             JobID jobId,
             long checkpointId,
             long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime);
+            CheckpointOptions checkpointOptions);
 
     /**
      * Frees the slot with the given allocation ID.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 428a0cc..6cae89f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -725,11 +725,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
-            final Time timeout) {
+            @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
 
-        return schedulerNG.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime);
+        return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b3345dc..6237985 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -234,14 +234,13 @@ public interface JobMasterGateway
      *
      * @param targetDirectory to which to write the savepoint data or null if the default savepoint
      *     directory should be used
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
+     * @param terminate flag indicating if the job should terminate or just suspend
      * @param timeout for the rpc call
      * @return Future which is completed with the savepoint path once completed
      */
     CompletableFuture<String> stopWithSavepoint(
             @Nullable final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
+            final boolean terminate,
             @RpcTimeout final Time timeout);
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index a7eb15f..714110e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -105,14 +105,9 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
             JobID jobId,
             long checkpointId,
             long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointOptions checkpointOptions) {
         taskExecutorGateway.triggerCheckpoint(
-                executionAttemptID,
-                checkpointId,
-                timestamp,
-                checkpointOptions,
-                advanceToEndOfEventTime);
+                executionAttemptID, checkpointId, timestamp, checkpointOptions);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 93335bd..21da08f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -668,11 +668,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
     }
 
     public CompletableFuture<String> stopWithSavepoint(
-            JobID jobId, String targetDirectory, boolean advanceToEndOfEventTime) {
+            JobID jobId, String targetDirectory, boolean terminate) {
         return runDispatcherCommand(
                 dispatcherGateway ->
                         dispatcherGateway.stopWithSavepoint(
-                                jobId, targetDirectory, advanceToEndOfEventTime, rpcTimeout));
+                                jobId, targetDirectory, terminate, rpcTimeout));
     }
 
     public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
index 8d9f710..35bfe85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
@@ -167,13 +167,13 @@ public class SavepointHandlers
                         HttpResponseStatus.BAD_REQUEST);
             }
 
-            final boolean advanceToEndOfEventTime = request.getRequestBody().shouldDrain();
+            final boolean shouldDrain = request.getRequestBody().shouldDrain();
             final String targetDirectory =
                     requestedTargetDirectory != null
                             ? requestedTargetDirectory
                             : defaultSavepointDir;
             return gateway.stopWithSavepoint(
-                    jobId, targetDirectory, advanceToEndOfEventTime, RpcUtils.INF_TIMEOUT);
+                    jobId, targetDirectory, shouldDrain, RpcUtils.INF_TIMEOUT);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1396c6d..c88ac57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -1045,7 +1045,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            final String targetDirectory, final boolean advanceToEndOfEventTime) {
+            final String targetDirectory, final boolean terminate) {
         mainThreadExecutor.assertRunningInMainThread();
 
         final CheckpointCoordinator checkpointCoordinator =
@@ -1082,7 +1082,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
-                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory)
+                        .triggerSynchronousSavepoint(terminate, targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
         final CompletableFuture<JobStatus> terminationFuture =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 06e74e6..e2fd16b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -146,8 +146,7 @@ public interface SchedulerNG {
 
     void declineCheckpoint(DeclineCheckpoint decline);
 
-    CompletableFuture<String> stopWithSavepoint(
-            String targetDirectory, boolean advanceToEndOfEventTime);
+    CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate);
 
     // ------------------------------------------------------------------------
     //  Operator Coordinator related methods
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7e28e6e..bd224e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -887,8 +888,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
             ExecutionAttemptID executionAttemptID,
             long checkpointId,
             long checkpointTimestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointOptions checkpointOptions) {
         log.debug(
                 "Trigger checkpoint {}@{} for {}.",
                 checkpointId,
@@ -896,7 +896,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
                 executionAttemptID);
 
         final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-        if (advanceToEndOfEventTime
+        if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
                 && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
             throw new IllegalArgumentException(
                     "Only synchronous savepoints are allowed to advance the watermark to MAX.");
@@ -905,8 +905,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
         final Task task = taskSlotTable.getTask(executionAttemptID);
 
         if (task != null) {
-            task.triggerCheckpointBarrier(
-                    checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
+            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
 
             return CompletableFuture.completedFuture(Acknowledge.get());
         } else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 8e59cee..829018d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -133,16 +133,13 @@ public interface TaskExecutorGateway extends RpcGateway, TaskExecutorOperatorEve
      * @param checkpointID unique id for the checkpoint
      * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
      * @param checkpointOptions for performing the checkpoint
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
      * @return Future acknowledge if the checkpoint has been successfully triggered
      */
     CompletableFuture<Acknowledge> triggerCheckpoint(
             ExecutionAttemptID executionAttemptID,
             long checkpointID,
             long checkpointTimestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime);
+            CheckpointOptions checkpointOptions);
 
     /**
      * Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dbfdba5..d48a750 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1240,14 +1240,11 @@ public class Task
      * @param checkpointID The ID identifying the checkpoint.
      * @param checkpointTimestamp The timestamp associated with the checkpoint.
      * @param checkpointOptions Options for performing this checkpoint.
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers.
      */
     public void triggerCheckpointBarrier(
             final long checkpointID,
             final long checkpointTimestamp,
-            final CheckpointOptions checkpointOptions,
-            final boolean advanceToEndOfEventTime) {
+            final CheckpointOptions checkpointOptions) {
 
         final AbstractInvokable invokable = this.invokable;
         final CheckpointMetaData checkpointMetaData =
@@ -1255,8 +1252,7 @@ public class Task
 
         if (executionState == ExecutionState.RUNNING && invokable != null) {
             try {
-                invokable.triggerCheckpointAsync(
-                        checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+                invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
             } catch (RejectedExecutionException ex) {
                 // This may happen if the mailbox is closed. It means that the task is shutting
                 // down, so we just ignore it.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 7a54ae9..3aea713 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -135,15 +135,14 @@ public interface RestfulGateway extends RpcGateway {
      * @param jobId ID of the job for which the savepoint should be triggered.
      * @param targetDirectory to which to write the savepoint data or null if the default savepoint
      *     directory should be used
-     * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
-     *     MAX_WATERMARK} in the pipeline to fire any registered event-time timers
+     * @param terminate flag indicating if the job should terminate or just suspend
      * @param timeout for the rpc call
      * @return Future which is completed with the savepoint path once completed
      */
     default CompletableFuture<String> stopWithSavepoint(
             final JobID jobId,
             final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
+            final boolean terminate,
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6379c8b..b2aa6db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2694,8 +2694,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             CheckpointProperties.forCheckpoint(
                                     CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                             null,
-                            true,
-                            false);
+                            true);
             manuallyTriggeredScheduledExecutor.triggerAll();
             try {
                 onCompletionPromise.get();
@@ -2789,8 +2788,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 jid,
                                 checkpointId,
                                 timestamp,
-                                checkpointOptions,
-                                advanceToEndOfEventTime) -> {});
+                                checkpointOptions) -> {});
         ExecutionVertex vertex2 =
                 mockExecutionVertex(
                         attemptID2,
@@ -2798,8 +2796,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 jid,
                                 checkpointId,
                                 timestamp,
-                                checkpointOptions,
-                                advanceToEndOfEventTime) -> {});
+                                checkpointOptions) -> {});
 
         OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
         OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
@@ -2953,8 +2950,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 jid,
                                 checkpointId,
                                 timestamp,
-                                checkpointOptions,
-                                advanceToEndOfEventTime) -> {});
+                                checkpointOptions) -> {});
         ExecutionVertex vertex2 =
                 mockExecutionVertex(
                         attemptID2,
@@ -2962,8 +2958,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 jid,
                                 checkpointId,
                                 timestamp,
-                                checkpointOptions,
-                                advanceToEndOfEventTime) -> {});
+                                checkpointOptions) -> {});
 
         OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
         OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 0ae6797..b573669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -583,8 +583,7 @@ public class CheckpointCoordinatorTestingUtils {
                                     jobId,
                                     (long) args[0],
                                     (long) args[1],
-                                    (CheckpointOptions) args[2],
-                                    false);
+                                    (CheckpointOptions) args[2]);
                             return null;
                         })
                 .when(mock)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 5509e71..db9f6b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -291,7 +291,6 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                         CheckpointProperties.forCheckpoint(
                                 CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                         null,
-                        false,
                         false);
         manuallyTriggeredScheduledExecutor.triggerAll();
         assertFalse(onCompletionPromise2.isCompletedExceptionally());
@@ -326,12 +325,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         final ExecutionAttemptID attemptID = new ExecutionAttemptID();
         final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
         final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) ->
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
                         taskManagerCheckpointTriggeredTimes.incrementAndGet();
         ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
 
@@ -365,12 +359,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         final ExecutionAttemptID attemptID = new ExecutionAttemptID();
         final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
         final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) ->
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
                         taskManagerCheckpointTriggeredTimes.incrementAndGet();
         ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
 
@@ -415,12 +404,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         final ExecutionAttemptID attemptID = new ExecutionAttemptID();
         final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
         final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) ->
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
                         taskManagerCheckpointTriggeredTimes.incrementAndGet();
         ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
 
@@ -513,12 +497,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         final ExecutionAttemptID attemptID = new ExecutionAttemptID();
         final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
         final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) ->
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
                         taskManagerCheckpointTriggeredTimes.incrementAndGet();
         ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
 
@@ -625,8 +604,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                 CheckpointProperties.forCheckpoint(
                         CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                 null,
-                true,
-                false);
+                true);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint(
@@ -636,7 +614,6 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                 CheckpointProperties.forCheckpoint(
                         CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                 null,
-                false,
                 false);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index c380835..d34d798 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -275,14 +275,11 @@ public class CheckpointRequestDeciderTest {
 
     private static CheckpointTriggerRequest savepointRequest(boolean force, boolean periodic) {
         return new CheckpointTriggerRequest(
-                CheckpointProperties.forSavepoint(force), null, periodic, false);
+                CheckpointProperties.forSavepoint(force), null, periodic);
     }
 
     private static CheckpointTriggerRequest checkpointRequest(boolean periodic) {
         return new CheckpointTriggerRequest(
-                CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
-                null,
-                periodic,
-                false);
+                CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION), null, periodic);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
index 227f262..481559b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -36,6 +36,6 @@ public class CheckpointTypeTest {
     public void testOrdinalsAreConstant() {
         assertEquals(0, CheckpointType.CHECKPOINT.ordinal());
         assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
-        assertEquals(2, CheckpointType.SYNC_SAVEPOINT.ordinal());
+        assertEquals(2, CheckpointType.SAVEPOINT_SUSPEND.ordinal());
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 339cca9..9635da3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -129,7 +129,7 @@ public class PendingCheckpointTest {
     @Test
     public void testSyncSavepointCannotBeSubsumed() throws Exception {
         // Forced checkpoints cannot be subsumed
-        CheckpointProperties forced = CheckpointProperties.forSyncSavepoint(true);
+        CheckpointProperties forced = CheckpointProperties.forSyncSavepoint(true, false);
         PendingCheckpoint pending = createPendingCheckpoint(forced);
         assertFalse(pending.canBeSubsumed());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 9e8903d..e16c209 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -61,12 +61,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
             (ignore1, ignore2) -> {};
 
     private CheckpointConsumer checkpointConsumer =
-            (executionAttemptID,
-                    jobId,
-                    checkpointId,
-                    timestamp,
-                    checkpointOptions,
-                    advanceToEndOfEventTime) -> {};
+            (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> {};
 
     public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) {
         this.submitConsumer = submitConsumer;
@@ -147,16 +142,10 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
             JobID jobId,
             long checkpointId,
             long timestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointOptions checkpointOptions) {
 
         checkpointConsumer.accept(
-                executionAttemptID,
-                jobId,
-                checkpointId,
-                timestamp,
-                checkpointOptions,
-                advanceToEndOfEventTime);
+                executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions);
     }
 
     @Override
@@ -187,7 +176,6 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
                 JobID jobId,
                 long checkpointId,
                 long timestamp,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime);
+                CheckpointOptions checkpointOptions);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
index 0c698c6..855f3e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
@@ -44,7 +44,7 @@ public class CheckpointSerializationTest {
     public void testSuspendingCheckpointBarrierSerialization() throws Exception {
         CheckpointOptions suspendSavepointToSerialize =
                 new CheckpointOptions(
-                        CheckpointType.SYNC_SAVEPOINT,
+                        CheckpointType.SAVEPOINT_SUSPEND,
                         new CheckpointStorageLocationReference(STORAGE_LOCATION_REF));
         testCheckpointBarrierSerialization(suspendSavepointToSerialize);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 842e7ce..6cf6b76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 
 import org.junit.Test;
 
@@ -47,7 +49,28 @@ public class EventSerializerTest {
         EndOfPartitionEvent.INSTANCE,
         EndOfSuperstepEvent.INSTANCE,
         new CheckpointBarrier(
-                1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()),
+                1678L,
+                4623784L,
+                new CheckpointOptions(
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault())),
+        new CheckpointBarrier(
+                1678L,
+                4623784L,
+                new CheckpointOptions(
+                        CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault())),
+        new CheckpointBarrier(
+                1678L,
+                4623784L,
+                new CheckpointOptions(
+                        CheckpointType.SAVEPOINT_SUSPEND,
+                        CheckpointStorageLocationReference.getDefault())),
+        new CheckpointBarrier(
+                1678L,
+                4623784L,
+                new CheckpointOptions(
+                        CheckpointType.SAVEPOINT_TERMINATE,
+                        CheckpointStorageLocationReference.getDefault())),
         new TestTaskEvent(Math.random(), 12361231273L),
         new CancelCheckpointMarker(287087987329842L)
     };
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 2ca5f9d..3f6e6fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -400,10 +400,8 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory,
-            final boolean advanceToEndOfEventTime,
-            final Time timeout) {
-        return stopWithSavepointFunction.apply(targetDirectory, advanceToEndOfEventTime);
+            @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
+        return stopWithSavepointFunction.apply(targetDirectory, terminate);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index c1732ec..969368a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -425,9 +425,7 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             actions.add(checkpointMetaData); // this signals the main thread should do a checkpoint
             return CompletableFuture.completedFuture(true);
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 26fa24d..08fa081 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -919,12 +919,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 .getLogicalSlotBuilder()
                 .setTaskManagerGateway(taskManagerGateway);
         taskManagerGateway.setCheckpointConsumer(
-                (executionAttemptID,
-                        jobId,
-                        checkpointId,
-                        timestamp,
-                        checkpointOptions,
-                        advanceToEndOfEventTime) -> {
+                (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> {
                     checkpointTriggeredLatch.countDown();
                 });
         return checkpointTriggeredLatch;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 67cbada..ab07a1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -203,8 +203,7 @@ public class TestingSchedulerNG implements SchedulerNG {
     }
 
     @Override
-    public CompletableFuture<String> stopWithSavepoint(
-            String targetDirectory, boolean advanceToEndOfEventTime) {
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
         failOperation();
         return null;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 9c37d1f..062a324 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -212,8 +212,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
             ExecutionAttemptID executionAttemptID,
             long checkpointID,
             long checkpointTimestamp,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointOptions checkpointOptions) {
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 94044a0..d769a5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -83,7 +83,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
     /**
      * Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpointAsync(CheckpointMetaData,
-     * CheckpointOptions, boolean)} was called {@link #numCalls} times.
+     * CheckpointOptions)} was called {@link #numCalls} times.
      */
     private static OneShotLatch triggerLatch;
 
@@ -121,10 +121,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
             for (int i = 1; i <= numCalls; i++) {
                 task.triggerCheckpointBarrier(
-                        i,
-                        156865867234L,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false);
+                        i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
             }
 
             triggerLatch.await();
@@ -147,10 +144,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
             for (int i = 1; i <= numCalls; i++) {
                 task.triggerCheckpointBarrier(
-                        i,
-                        156865867234L,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false);
+                        i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
                 task.notifyCheckpointComplete(i);
             }
 
@@ -260,9 +254,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             lastCheckpointId++;
             if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
                 if (lastCheckpointId == numCalls) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 8c83ff5..7ff7ac8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -264,7 +264,7 @@ public class TestingRestfulGateway implements RestfulGateway {
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            JobID jobId, String targetDirectory, boolean advanceToEndOfTime, Time timeout) {
+            JobID jobId, String targetDirectory, boolean terminate, Time timeout) {
         return stopWithSavepointFunction.apply(jobId, targetDirectory);
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 934715b..5532eff 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -220,8 +220,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
         task.triggerCheckpointAsync(
                         new CheckpointMetaData(42, 17),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         testHarness.processElement(new StreamRecord<>("Wohoo", 0));
@@ -342,8 +341,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
         task.triggerCheckpointAsync(
                         new CheckpointMetaData(42, 17),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         testHarness.processElement(new StreamRecord<>("Wohoo", 0));
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 54ffac2..8181e02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -91,12 +91,9 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
 
     @Override
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         if (!isExternallyInducedSource) {
-            return super.triggerCheckpointAsync(
-                    checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
         } else {
             return CompletableFuture.completedFuture(isRunning());
         }
@@ -127,7 +124,7 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
         final CheckpointMetaData checkpointMetaData =
                 new CheckpointMetaData(checkpointId, timestamp);
 
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false);
+        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
     }
 
     // ---------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index db1fba9..42b1ccf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -111,7 +111,7 @@ public class SourceStreamTask<
                             try {
                                 SourceStreamTask.super
                                         .triggerCheckpointAsync(
-                                                checkpointMetaData, checkpointOptions, false)
+                                                checkpointMetaData, checkpointOptions)
                                         .get();
                             } catch (RuntimeException e) {
                                 throw e;
@@ -217,12 +217,9 @@ public class SourceStreamTask<
 
     @Override
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         if (!externallyInducedCheckpoints) {
-            return super.triggerCheckpointAsync(
-                    checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
         } else {
             // we do not trigger checkpoints here, we simply state whether we can trigger them
             synchronized (lock) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 32b8612..573bd31 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -850,9 +850,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
 
     @Override
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
 
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         mainMailboxExecutor.execute(
@@ -864,11 +862,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                                             System.currentTimeMillis()
                                                     - checkpointMetaData.getTimestamp());
                     try {
-                        result.complete(
-                                triggerCheckpoint(
-                                        checkpointMetaData,
-                                        checkpointOptions,
-                                        advanceToEndOfEventTime));
+                        result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions));
                     } catch (Exception ex) {
                         // Report the failure both via the Future result but also to the mailbox
                         result.completeExceptionally(ex);
@@ -882,9 +876,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     }
 
     private boolean triggerCheckpoint(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime)
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
             throws Exception {
         try {
             // No alignment if we inject a checkpoint
@@ -895,11 +887,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                     checkpointMetaData.getCheckpointId(), checkpointOptions);
 
             boolean success =
-                    performCheckpoint(
-                            checkpointMetaData,
-                            checkpointOptions,
-                            checkpointMetrics,
-                            advanceToEndOfEventTime);
+                    performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
             if (!success) {
                 declineCheckpoint(checkpointMetaData.getCheckpointId());
             }
@@ -945,8 +933,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
             throws IOException {
 
         try {
-            if (performCheckpoint(
-                    checkpointMetaData, checkpointOptions, checkpointMetrics, false)) {
+            if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics)) {
                 if (isSynchronousSavepointId(checkpointMetaData.getCheckpointId())) {
                     runSynchronousSavepointMailboxLoop();
                 }
@@ -977,8 +964,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     private boolean performCheckpoint(
             CheckpointMetaData checkpointMetaData,
             CheckpointOptions checkpointOptions,
-            CheckpointMetrics checkpointMetrics,
-            boolean advanceToEndOfTime)
+            CheckpointMetrics checkpointMetrics)
             throws Exception {
 
         LOG.debug(
@@ -993,7 +979,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                         if (checkpointOptions.getCheckpointType().isSynchronous()) {
                             setSynchronousSavepointId(checkpointMetaData.getCheckpointId());
 
-                            if (advanceToEndOfTime) {
+                            if (checkpointOptions.getCheckpointType().shouldAdvanceToEndOfTime()) {
                                 advanceToEndOfEventTime();
                             }
                         } else if (activeSyncSavepointId != null
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 904654d..9e15fe2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -283,8 +283,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                                                             new CheckpointMetaData(
                                                                     0, System.currentTimeMillis()),
                                                             CheckpointOptions
-                                                                    .forCheckpointWithDefaultLocation(),
-                                                            false)
+                                                                    .forCheckpointWithDefaultLocation())
                                                     .get()) {
                                         LifecycleTrackingStreamSource.runFinish.trigger();
                                     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 5af7bc9..19184ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -526,7 +526,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                 new CheckpointMetaData(checkpointId, checkpointTimestamp);
 
         task.triggerCheckpointAsync(
-                checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+                checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
 
         taskStateManagerMock.getWaitForReportLatch().await();
 
@@ -569,8 +569,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
         restoredTask
                 .triggerCheckpointAsync(
                         new CheckpointMetaData(checkpointId, checkpointTimestamp),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
index 33ad7d0..6e939dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
@@ -1108,9 +1108,7 @@ public abstract class CheckpointBarrierAlignerTestBase {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             throw new UnsupportedOperationException("should never be called");
         }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
index 180ce14..44dbe29 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
@@ -49,9 +49,7 @@ class CheckpointSequenceValidator extends AbstractInvokable {
 
     @Override
     public Future<Boolean> triggerCheckpointAsync(
-            CheckpointMetaData checkpointMetaData,
-            CheckpointOptions checkpointOptions,
-            boolean advanceToEndOfEventTime) {
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         throw new UnsupportedOperationException("should never be called");
     }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index d935af9..1c9c270 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -607,9 +607,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
         streamTask
                 .triggerCheckpointAsync(
-                        checkpointMetaData,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         // since no state was set, there shouldn't be restore calls
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
index ae3d14d..14c25d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
@@ -285,7 +285,7 @@ public class RestoreStreamTaskTest extends TestLogger {
         testHarness.taskStateManager.setWaitForReportLatch(new OneShotLatch());
 
         streamTask.triggerCheckpointAsync(
-                checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+                checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
 
         testHarness.taskStateManager.getWaitForReportLatch().await();
         long reportedCheckpointId = testHarness.taskStateManager.getReportedCheckpointId();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
index 303cf37..7308e26 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
@@ -89,8 +89,7 @@ public class SourceExternalCheckpointTriggerTest {
                 sourceTask
                         .triggerCheckpointAsync(
                                 new CheckpointMetaData(32, 829),
-                                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                                false)
+                                CheckpointOptions.forCheckpointWithDefaultLocation())
                         .get());
 
         // step by step let the source thread emit elements
@@ -111,8 +110,7 @@ public class SourceExternalCheckpointTriggerTest {
                 sourceTask
                         .triggerCheckpointAsync(
                                 new CheckpointMetaData(34, 900),
-                                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                                false)
+                                CheckpointOptions.forCheckpointWithDefaultLocation())
                         .get());
 
         sync.trigger();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 123fcca..b5edd92 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -98,7 +98,7 @@ public class SourceOperatorStreamTaskTest {
 
             final CheckpointOptions checkpointOptions =
                     new CheckpointOptions(
-                            CheckpointType.SYNC_SAVEPOINT,
+                            CheckpointType.SAVEPOINT_TERMINATE,
                             CheckpointStorageLocationReference.getDefault());
             triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions);
 
@@ -202,7 +202,7 @@ public class SourceOperatorStreamTaskTest {
         Future<Boolean> checkpointFuture =
                 testHarness
                         .getStreamTask()
-                        .triggerCheckpointAsync(checkpointMetaData, checkpointOptions, true);
+                        .triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
 
         // Wait until the checkpoint finishes.
         // We have to mark the source reader as available here, otherwise the runMailboxStep() call
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index ebafdbf..8ba4739 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -76,7 +76,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT_SUSPEND;
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -115,8 +115,7 @@ public class SourceStreamTaskTest {
         Future<Boolean> triggerFuture =
                 harness.streamTask.triggerCheckpointAsync(
                         new CheckpointMetaData(1, 1),
-                        new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
-                        false);
+                        new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()));
         while (!triggerFuture.isDone()) {
             harness.streamTask.runMailboxStep();
         }
@@ -175,8 +174,7 @@ public class SourceStreamTaskTest {
         Future<Boolean> triggerFuture =
                 harness.streamTask.triggerCheckpointAsync(
                         new CheckpointMetaData(1L, System.currentTimeMillis()),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false);
+                        CheckpointOptions.forCheckpointWithDefaultLocation());
 
         assertFalse(triggerFuture.isDone());
         Thread.sleep(sleepTime);
@@ -590,8 +588,7 @@ public class SourceStreamTaskTest {
             Future<Boolean> triggerFuture =
                     harness.streamTask.triggerCheckpointAsync(
                             new CheckpointMetaData(checkpointId, 1),
-                            new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
-                            false);
+                            new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()));
             while (!triggerFuture.isDone()) {
                 harness.streamTask.runMailboxStep();
             }
@@ -711,8 +708,7 @@ public class SourceStreamTaskTest {
                 try {
                     sourceTask.triggerCheckpointAsync(
                             new CheckpointMetaData(currentCheckpointId, 0L),
-                            CheckpointOptions.forCheckpointWithDefaultLocation(),
-                            false);
+                            CheckpointOptions.forCheckpointWithDefaultLocation());
                 } catch (RejectedExecutionException e) {
                     // We are late with a checkpoint, the mailbox is already closed.
                     return false;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
index 43df989..9aa0fcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
@@ -74,7 +74,7 @@ public class SourceTaskTerminationTest extends TestLogger {
         stopWithSavepointStreamTaskTestHelper(false);
     }
 
-    private void stopWithSavepointStreamTaskTestHelper(final boolean withMaxWatermark)
+    private void stopWithSavepointStreamTaskTestHelper(final boolean shouldTerminate)
             throws Exception {
         final long syncSavepointId = 34L;
 
@@ -90,8 +90,7 @@ public class SourceTaskTerminationTest extends TestLogger {
 
         srcTask.triggerCheckpointAsync(
                         new CheckpointMetaData(31L, 900),
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get();
 
         verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
@@ -101,12 +100,13 @@ public class SourceTaskTerminationTest extends TestLogger {
         srcTask.triggerCheckpointAsync(
                         new CheckpointMetaData(syncSavepointId, 900),
                         new CheckpointOptions(
-                                CheckpointType.SYNC_SAVEPOINT,
-                                CheckpointStorageLocationReference.getDefault()),
-                        withMaxWatermark)
+                                shouldTerminate
+                                        ? CheckpointType.SAVEPOINT_TERMINATE
+                                        : CheckpointType.SAVEPOINT_SUSPEND,
+                                CheckpointStorageLocationReference.getDefault()))
                 .get();
 
-        if (withMaxWatermark) {
+        if (shouldTerminate) {
             // if we are in TERMINATE mode, we expect the source task
             // to emit MAX_WM before the SYNC_SAVEPOINT barrier.
             verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
index 1d7f33e..17b604d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
@@ -74,8 +74,7 @@ public class StreamTaskExecutionDecorationTest {
                 new CheckpointMetaData(1, 2),
                 new CheckpointOptions(
                         CheckpointType.CHECKPOINT,
-                        new CheckpointStorageLocationReference(new byte[] {1})),
-                false);
+                        new CheckpointStorageLocationReference(new byte[] {1})));
         Assert.assertTrue("mailbox is empty", mailbox.hasMail());
         Assert.assertFalse("execution decorator was called preliminary", decorator.wasCalled());
         mailbox.drain()
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 12f3676..235cdef 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -199,8 +199,7 @@ public class StreamTaskTerminationTest extends TestLogger {
         task.triggerCheckpointBarrier(
                 checkpointId,
                 checkpointTimestamp,
-                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                false);
+                CheckpointOptions.forCheckpointWithDefaultLocation());
 
         // wait until the task has completed execution
         taskRun.get();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bfd7d54..904097e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -154,7 +154,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT_SUSPEND;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
@@ -237,7 +237,7 @@ public class StreamTaskTest extends TestLogger {
                     try {
                         harness.streamTask.triggerCheckpointOnBarrier(
                                 new CheckpointMetaData(checkpointId, checkpointId),
-                                new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                                new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()),
                                 new CheckpointMetrics());
                     } catch (IOException e) {
                         fail(e.getMessage());
@@ -649,8 +649,7 @@ public class StreamTaskTest extends TestLogger {
 
         streamTask.triggerCheckpointAsync(
                 new CheckpointMetaData(42L, 1L),
-                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                false);
+                CheckpointOptions.forCheckpointWithDefaultLocation());
 
         try {
             task.waitForTaskCompletion(false);
@@ -701,8 +700,7 @@ public class StreamTaskTest extends TestLogger {
 
         streamTask.triggerCheckpointAsync(
                 new CheckpointMetaData(42L, 1L),
-                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                false);
+                CheckpointOptions.forCheckpointWithDefaultLocation());
 
         final Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException();
         assertThat(uncaughtException, is(failingCause));
@@ -752,8 +750,7 @@ public class StreamTaskTest extends TestLogger {
             streamTask
                     .triggerCheckpointAsync(
                             new CheckpointMetaData(42L, 1L),
-                            CheckpointOptions.forCheckpointWithDefaultLocation(),
-                            false)
+                            CheckpointOptions.forCheckpointWithDefaultLocation())
                     .get();
 
             // wait for the completion of the async task
@@ -868,8 +865,7 @@ public class StreamTaskTest extends TestLogger {
             final long checkpointId = 42L;
             streamTask.triggerCheckpointAsync(
                     new CheckpointMetaData(checkpointId, 1L),
-                    CheckpointOptions.forCheckpointWithDefaultLocation(),
-                    false);
+                    CheckpointOptions.forCheckpointWithDefaultLocation());
 
             acknowledgeCheckpointLatch.await();
 
@@ -958,8 +954,7 @@ public class StreamTaskTest extends TestLogger {
         final long checkpointId = 42L;
         task.streamTask.triggerCheckpointAsync(
                 new CheckpointMetaData(checkpointId, 1L),
-                CheckpointOptions.forCheckpointWithDefaultLocation(),
-                false);
+                CheckpointOptions.forCheckpointWithDefaultLocation());
 
         rawKeyedStateHandleFuture.awaitRun();
 
@@ -1044,8 +1039,7 @@ public class StreamTaskTest extends TestLogger {
 
             task.streamTask.triggerCheckpointAsync(
                     new CheckpointMetaData(42L, 1L),
-                    CheckpointOptions.forCheckpointWithDefaultLocation(),
-                    false);
+                    CheckpointOptions.forCheckpointWithDefaultLocation());
 
             checkpointCompletedLatch.await(30, TimeUnit.SECONDS);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index ba122d5..9b5452f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -108,9 +108,8 @@ public class SynchronousCheckpointITCase {
                     42,
                     156865867234L,
                     new CheckpointOptions(
-                            CheckpointType.SYNC_SAVEPOINT,
-                            CheckpointStorageLocationReference.getDefault()),
-                    true);
+                            CheckpointType.SAVEPOINT_SUSPEND,
+                            CheckpointStorageLocationReference.getDefault()));
 
             assertThat(eventQueue.take(), is(Event.PRE_TRIGGER_CHECKPOINT));
             assertThat(eventQueue.take(), is(Event.POST_TRIGGER_CHECKPOINT));
@@ -153,14 +152,11 @@ public class SynchronousCheckpointITCase {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             try {
                 eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT);
                 Future<Boolean> result =
-                        super.triggerCheckpointAsync(
-                                checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+                        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
                 eventQueue.put(Event.POST_TRIGGER_CHECKPOINT);
                 return result;
             } catch (InterruptedException e) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 5d75d38..0a4949c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -111,9 +111,8 @@ public class SynchronousCheckpointTest {
         streamTaskUnderTest.triggerCheckpointAsync(
                 new CheckpointMetaData(42, System.currentTimeMillis()),
                 new CheckpointOptions(
-                        CheckpointType.SYNC_SAVEPOINT,
-                        CheckpointStorageLocationReference.getDefault()),
-                false);
+                        CheckpointType.SAVEPOINT_SUSPEND,
+                        CheckpointStorageLocationReference.getDefault()));
         waitForSyncSavepointIdToBeSet(streamTaskUnderTest);
     }
 
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 53a0612..f72d53a 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -351,19 +351,16 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 
         @Override
         public Future<Boolean> triggerCheckpointAsync(
-                CheckpointMetaData checkpointMetaData,
-                CheckpointOptions checkpointOptions,
-                boolean advanceToEndOfEventTime) {
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
             final long checkpointId = checkpointMetaData.getCheckpointId();
             final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
 
-            if (checkpointType == CheckpointType.SYNC_SAVEPOINT) {
+            if (checkpointType == CheckpointType.SAVEPOINT_SUSPEND) {
                 synchronousSavepointId = checkpointId;
                 syncSavepointId.compareAndSet(-1, synchronousSavepointId);
             }
 
-            return super.triggerCheckpointAsync(
-                    checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
         }
 
         @Override
@@ -435,8 +432,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
         @Override
         public Future<Boolean> triggerCheckpointAsync(
                 final CheckpointMetaData checkpointMetaData,
-                final CheckpointOptions checkpointOptions,
-                final boolean advanceToEndOfEventTime) {
+                final CheckpointOptions checkpointOptions) {
             final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
             if (taskIndex == 0) {
                 checkpointsToWaitFor.countDown();
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index ecb9d5e..cd79163 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -239,8 +239,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
         @Override
         public Future<Boolean> triggerCheckpointAsync(
                 final CheckpointMetaData checkpointMetaData,
-                final CheckpointOptions checkpointOptions,
-                final boolean advanceToEndOfEventTime) {
+                final CheckpointOptions checkpointOptions) {
             final TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
             checkpointStateHandles.putSubtaskStateByOperatorID(
                     OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
index dd282e7..95886dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
@@ -183,9 +183,7 @@ public class StatefulOperatorChainedTaskTest {
 
         while (!streamTask
                 .triggerCheckpointAsync(
-                        checkpointMetaData,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
-                        false)
+                        checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())
                 .get()) {}
 
         testHarness.getTaskStateManager().getWaitForReportLatch().await();