You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/25 10:31:23 UTC
[flink] 02/03: [FLINK-21453][checkpointing][refactor] Replace
advanceToEndOfTime with new CheckpointType.SAVEPOINT_TERMINATE
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7cb2154d7febe419d4d4b80900984c4132a90194
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Feb 23 20:34:00 2021 +0100
[FLINK-21453][checkpointing][refactor] Replace advanceToEndOfTime with new CheckpointType.SAVEPOINT_TERMINATE
---
.../runtime/checkpoint/CheckpointCoordinator.java | 44 ++++++++--------------
.../runtime/checkpoint/CheckpointProperties.java | 19 +++++-----
.../flink/runtime/checkpoint/CheckpointType.java | 34 +++++++++++++----
.../flink/runtime/dispatcher/Dispatcher.java | 5 +--
.../flink/runtime/executiongraph/Execution.java | 27 ++++---------
.../network/api/serialization/EventSerializer.java | 16 +++++---
.../runtime/jobgraph/tasks/AbstractInvokable.java | 6 +--
.../jobmanager/slots/TaskManagerGateway.java | 5 +--
.../apache/flink/runtime/jobmaster/JobMaster.java | 6 +--
.../flink/runtime/jobmaster/JobMasterGateway.java | 5 +--
.../runtime/jobmaster/RpcTaskManagerGateway.java | 9 +----
.../flink/runtime/minicluster/MiniCluster.java | 4 +-
.../handler/job/savepoints/SavepointHandlers.java | 4 +-
.../flink/runtime/scheduler/SchedulerBase.java | 4 +-
.../flink/runtime/scheduler/SchedulerNG.java | 3 +-
.../flink/runtime/taskexecutor/TaskExecutor.java | 9 ++---
.../runtime/taskexecutor/TaskExecutorGateway.java | 5 +--
.../org/apache/flink/runtime/taskmanager/Task.java | 8 +---
.../flink/runtime/webmonitor/RestfulGateway.java | 5 +--
.../checkpoint/CheckpointCoordinatorTest.java | 15 +++-----
.../CheckpointCoordinatorTestingUtils.java | 3 +-
.../CheckpointCoordinatorTriggeringTest.java | 33 +++-------------
.../checkpoint/CheckpointRequestDeciderTest.java | 7 +---
.../runtime/checkpoint/CheckpointTypeTest.java | 2 +-
.../runtime/checkpoint/PendingCheckpointTest.java | 2 +-
.../utils/SimpleAckingTaskManagerGateway.java | 20 ++--------
.../serialization/CheckpointSerializationTest.java | 2 +-
.../api/serialization/EventSerializerTest.java | 25 +++++++++++-
.../jobmaster/utils/TestingJobMasterGateway.java | 6 +--
.../CoordinatorEventsExactlyOnceITCase.java | 4 +-
.../runtime/scheduler/DefaultSchedulerTest.java | 7 +---
.../runtime/scheduler/TestingSchedulerNG.java | 3 +-
.../taskexecutor/TestingTaskExecutorGateway.java | 3 +-
.../runtime/taskmanager/TaskAsyncCallTest.java | 16 ++------
.../runtime/webmonitor/TestingRestfulGateway.java | 2 +-
.../streaming/state/RocksDBAsyncSnapshotTest.java | 6 +--
.../runtime/tasks/SourceOperatorStreamTask.java | 9 ++---
.../streaming/runtime/tasks/SourceStreamTask.java | 9 ++---
.../flink/streaming/runtime/tasks/StreamTask.java | 28 ++++----------
.../AbstractUdfStreamOperatorLifecycleTest.java | 3 +-
.../api/operators/async/AsyncWaitOperatorTest.java | 5 +--
.../io/CheckpointBarrierAlignerTestBase.java | 4 +-
.../runtime/io/CheckpointSequenceValidator.java | 4 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 4 +-
.../runtime/tasks/RestoreStreamTaskTest.java | 2 +-
.../tasks/SourceExternalCheckpointTriggerTest.java | 6 +--
.../tasks/SourceOperatorStreamTaskTest.java | 4 +-
.../runtime/tasks/SourceStreamTaskTest.java | 14 +++----
.../runtime/tasks/SourceTaskTerminationTest.java | 14 +++----
.../tasks/StreamTaskExecutionDecorationTest.java | 3 +-
.../runtime/tasks/StreamTaskTerminationTest.java | 3 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 22 ++++-------
.../runtime/tasks/SynchronousCheckpointITCase.java | 12 ++----
.../runtime/tasks/SynchronousCheckpointTest.java | 5 +--
.../jobmaster/JobMasterStopWithSavepointIT.java | 12 ++----
.../jobmaster/JobMasterTriggerSavepointITCase.java | 3 +-
.../state/StatefulOperatorChainedTaskTest.java | 4 +-
57 files changed, 210 insertions(+), 334 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 46da0d2..e0645fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -441,14 +442,13 @@ public class CheckpointCoordinator {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, false, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation);
}
/**
* Triggers a synchronous savepoint with the given savepoint directory as a target.
*
- * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
- * MAX_WATERMARK} in the pipeline to fire any registered event-time timers.
+ * @param terminate flag indicating if the job should terminate or just suspend
* @param targetLocation Target location for the savepoint, optional. If null, the state
* backend's configured default will be used.
* @return A future to the completed checkpoint
@@ -456,17 +456,16 @@ public class CheckpointCoordinator {
* savepoint directory has been configured
*/
public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
- final boolean advanceToEndOfEventTime, @Nullable final String targetLocation) {
+ final boolean terminate, @Nullable final String targetLocation) {
final CheckpointProperties properties =
- CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled);
+ CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
- return triggerSavepointInternal(properties, advanceToEndOfEventTime, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation);
}
private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
final CheckpointProperties checkpointProperties,
- final boolean advanceToEndOfEventTime,
@Nullable final String targetLocation) {
checkNotNull(checkpointProperties);
@@ -476,11 +475,7 @@ public class CheckpointCoordinator {
final CompletableFuture<CompletedCheckpoint> resultFuture = new CompletableFuture<>();
timer.execute(
() ->
- triggerCheckpoint(
- checkpointProperties,
- targetLocation,
- false,
- advanceToEndOfEventTime)
+ triggerCheckpoint(checkpointProperties, targetLocation, false)
.whenComplete(
(completedCheckpoint, throwable) -> {
if (throwable == null) {
@@ -502,25 +497,24 @@ public class CheckpointCoordinator {
* @return a future to the completed checkpoint.
*/
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic) {
- return triggerCheckpoint(checkpointProperties, null, isPeriodic, false);
+ return triggerCheckpoint(checkpointProperties, null, isPeriodic);
}
@VisibleForTesting
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
CheckpointProperties props,
@Nullable String externalSavepointLocation,
- boolean isPeriodic,
- boolean advanceToEndOfTime) {
+ boolean isPeriodic) {
- if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
+ if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE
+ && !(props.isSynchronous() && props.isSavepoint())) {
return FutureUtils.completedExceptionally(
new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX."));
}
CheckpointTriggerRequest request =
- new CheckpointTriggerRequest(
- props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
+ new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic);
chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
return request.onCompletionPromise;
}
@@ -619,8 +613,7 @@ public class CheckpointCoordinator {
checkpointId,
checkpoint.getCheckpointStorageLocation(),
request.props,
- executions,
- request.advanceToEndOfTime);
+ executions);
coordinatorsToCheckpoint.forEach(
(ctx) ->
@@ -820,8 +813,7 @@ public class CheckpointCoordinator {
long checkpointID,
CheckpointStorageLocation checkpointStorageLocation,
CheckpointProperties props,
- Execution[] executions,
- boolean advanceToEndOfTime) {
+ Execution[] executions) {
final CheckpointOptions checkpointOptions =
new CheckpointOptions(
@@ -834,8 +826,7 @@ public class CheckpointCoordinator {
// send the messages to the tasks that trigger their checkpoint
for (Execution execution : executions) {
if (props.isSynchronous()) {
- execution.triggerSynchronousSavepoint(
- checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
+ execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
@@ -2037,21 +2028,18 @@ public class CheckpointCoordinator {
final CheckpointProperties props;
final @Nullable String externalSavepointLocation;
final boolean isPeriodic;
- final boolean advanceToEndOfTime;
private final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
new CompletableFuture<>();
CheckpointTriggerRequest(
CheckpointProperties props,
@Nullable String externalSavepointLocation,
- boolean isPeriodic,
- boolean advanceToEndOfTime) {
+ boolean isPeriodic) {
this.timestamp = System.currentTimeMillis();
this.props = checkNotNull(props);
this.externalSavepointLocation = externalSavepointLocation;
this.isPeriodic = isPeriodic;
- this.advanceToEndOfTime = advanceToEndOfTime;
}
CompletableFuture<CompletedCheckpoint> getOnCompletionFuture() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 96162a2..91c9374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -238,14 +238,6 @@ public class CheckpointProperties implements Serializable {
// Factories and pre-configured properties
// ------------------------------------------------------------------------
- private static final CheckpointProperties SYNC_SAVEPOINT =
- new CheckpointProperties(
- true, CheckpointType.SYNC_SAVEPOINT, false, false, false, false, false);
-
- private static final CheckpointProperties SYNC_SAVEPOINT_NOT_FORCED =
- new CheckpointProperties(
- false, CheckpointType.SYNC_SAVEPOINT, false, false, false, false, false);
-
private static final CheckpointProperties SAVEPOINT =
new CheckpointProperties(
true, CheckpointType.SAVEPOINT, false, false, false, false, false);
@@ -296,8 +288,15 @@ public class CheckpointProperties implements Serializable {
return forced ? SAVEPOINT : SAVEPOINT_NO_FORCE;
}
- public static CheckpointProperties forSyncSavepoint(boolean forced) {
- return forced ? SYNC_SAVEPOINT : SYNC_SAVEPOINT_NOT_FORCED;
+ public static CheckpointProperties forSyncSavepoint(boolean forced, boolean terminate) {
+ return new CheckpointProperties(
+ forced,
+ terminate ? CheckpointType.SAVEPOINT_TERMINATE : CheckpointType.SAVEPOINT_SUSPEND,
+ false,
+ false,
+ false,
+ false,
+ false);
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
index a1df789..02c5c1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
@@ -22,22 +22,25 @@ package org.apache.flink.runtime.checkpoint;
public enum CheckpointType {
/** A checkpoint, full or incremental. */
- CHECKPOINT(false, false),
+ CHECKPOINT(false, PostCheckpointAction.NONE),
/** A regular savepoint. */
- SAVEPOINT(true, false),
+ SAVEPOINT(true, PostCheckpointAction.NONE),
- /** A savepoint taken while suspending/terminating the job. */
- SYNC_SAVEPOINT(true, true);
+ /** A savepoint taken while suspending the job. */
+ SAVEPOINT_SUSPEND(true, PostCheckpointAction.SUSPEND),
+
+ /** A savepoint taken while terminating the job. */
+ SAVEPOINT_TERMINATE(true, PostCheckpointAction.TERMINATE);
private final boolean isSavepoint;
- private final boolean isSynchronous;
+ private final PostCheckpointAction postCheckpointAction;
- CheckpointType(final boolean isSavepoint, final boolean isSynchronous) {
+ CheckpointType(final boolean isSavepoint, final PostCheckpointAction postCheckpointAction) {
this.isSavepoint = isSavepoint;
- this.isSynchronous = isSynchronous;
+ this.postCheckpointAction = postCheckpointAction;
}
public boolean isSavepoint() {
@@ -45,6 +48,21 @@ public enum CheckpointType {
}
public boolean isSynchronous() {
- return isSynchronous;
+ return postCheckpointAction != PostCheckpointAction.NONE;
+ }
+
+ public PostCheckpointAction getPostCheckpointAction() {
+ return postCheckpointAction;
+ }
+
+ public boolean shouldAdvanceToEndOfTime() {
+ return getPostCheckpointAction() == PostCheckpointAction.TERMINATE;
+ }
+
+ /** What's the intended action after the checkpoint (relevant for stopping with savepoint). */
+ public enum PostCheckpointAction {
+ NONE,
+ SUSPEND,
+ TERMINATE
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 18939bb..a586f59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -729,15 +729,14 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
public CompletableFuture<String> stopWithSavepoint(
final JobID jobId,
final String targetDirectory,
- final boolean advanceToEndOfEventTime,
+ final boolean terminate,
final Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
getJobMasterGatewayFuture(jobId);
return jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) ->
- jobMasterGateway.stopWithSavepoint(
- targetDirectory, advanceToEndOfEventTime, timeout));
+ jobMasterGateway.stopWithSavepoint(targetDirectory, terminate, timeout));
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 50f05ac..8fa4e61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -1111,7 +1112,7 @@ public class Execution
*/
public void triggerCheckpoint(
long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
- triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, false);
+ triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
}
/**
@@ -1120,26 +1121,17 @@ public class Execution
* @param checkpointId of th checkpoint to trigger
* @param timestamp of the checkpoint to trigger
* @param checkpointOptions of the checkpoint to trigger
- * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
- * MAX_WATERMARK} in the pipeline to fire any registered event-time timers
*/
public void triggerSynchronousSavepoint(
- long checkpointId,
- long timestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
- triggerCheckpointHelper(
- checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
+ long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
+ triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
}
private void triggerCheckpointHelper(
- long checkpointId,
- long timestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
- if (advanceToEndOfEventTime
+ if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
&& !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX.");
@@ -1151,12 +1143,7 @@ public class Execution
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(
- attemptId,
- getVertex().getJobId(),
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime);
+ attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug(
"The execution has no slot assigned. This indicates that the execution is no longer running.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 210d7c7..f0a3cea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -64,7 +64,9 @@ public class EventSerializer {
private static final int CHECKPOINT_TYPE_SAVEPOINT = 1;
- private static final int CHECKPOINT_TYPE_SYNC_SAVEPOINT = 2;
+ private static final int CHECKPOINT_TYPE_SAVEPOINT_SUSPEND = 2;
+
+ private static final int CHECKPOINT_TYPE_SAVEPOINT_TERMINATE = 3;
// ------------------------------------------------------------------------
// Serialization Logic
@@ -220,8 +222,10 @@ public class EventSerializer {
typeInt = CHECKPOINT_TYPE_CHECKPOINT;
} else if (checkpointType == CheckpointType.SAVEPOINT) {
typeInt = CHECKPOINT_TYPE_SAVEPOINT;
- } else if (checkpointType == CheckpointType.SYNC_SAVEPOINT) {
- typeInt = CHECKPOINT_TYPE_SYNC_SAVEPOINT;
+ } else if (checkpointType == CheckpointType.SAVEPOINT_SUSPEND) {
+ typeInt = CHECKPOINT_TYPE_SAVEPOINT_SUSPEND;
+ } else if (checkpointType == CheckpointType.SAVEPOINT_TERMINATE) {
+ typeInt = CHECKPOINT_TYPE_SAVEPOINT_TERMINATE;
} else {
throw new IOException("Unknown checkpoint type: " + checkpointType);
}
@@ -257,8 +261,10 @@ public class EventSerializer {
checkpointType = CheckpointType.CHECKPOINT;
} else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT) {
checkpointType = CheckpointType.SAVEPOINT;
- } else if (checkpointTypeCode == CHECKPOINT_TYPE_SYNC_SAVEPOINT) {
- checkpointType = CheckpointType.SYNC_SAVEPOINT;
+ } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT_SUSPEND) {
+ checkpointType = CheckpointType.SAVEPOINT_SUSPEND;
+ } else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT_TERMINATE) {
+ checkpointType = CheckpointType.SAVEPOINT_TERMINATE;
} else {
throw new IOException("Unknown checkpoint type code: " + checkpointTypeCode);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 22fbb8a..c98ed6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -209,15 +209,11 @@ public abstract class AbstractInvokable {
*
* @param checkpointMetaData Meta data for about this checkpoint
* @param checkpointOptions Options for performing this checkpoint
- * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
- * MAX_WATERMARK} in the pipeline to fire any registered event-time timers
* @return future with value of {@code false} if the checkpoint was not carried out, {@code
* true} otherwise
*/
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
throw new UnsupportedOperationException(
String.format(
"triggerCheckpointAsync not supported by %s", this.getClass().getName()));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index d209481..c115f2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -127,16 +127,13 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway {
* @param checkpointId of the checkpoint to trigger
* @param timestamp of the checkpoint to trigger
* @param checkpointOptions of the checkpoint to trigger
- * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
- * MAX_WATERMARK} in the pipeline to fire any registered event-time timers
*/
void triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long timestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime);
+ CheckpointOptions checkpointOptions);
/**
* Frees the slot with the given allocation ID.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 428a0cc..6cae89f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -725,11 +725,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
@Override
public CompletableFuture<String> stopWithSavepoint(
- @Nullable final String targetDirectory,
- final boolean advanceToEndOfEventTime,
- final Time timeout) {
+ @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
- return schedulerNG.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime);
+ return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b3345dc..6237985 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -234,14 +234,13 @@ public interface JobMasterGateway
*
* @param targetDirectory to which to write the savepoint data or null if the default savepoint
* directory should be used
- * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
- * MAX_WATERMARK} in the pipeline to fire any registered event-time timers
+ * @param terminate flag indicating if the job should terminate or just suspend
* @param timeout for the rpc call
* @return Future which is completed with the savepoint path once completed
*/
CompletableFuture<String> stopWithSavepoint(
@Nullable final String targetDirectory,
- final boolean advanceToEndOfEventTime,
+ final boolean terminate,
@RpcTimeout final Time timeout);
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index a7eb15f..714110e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -105,14 +105,9 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
JobID jobId,
long checkpointId,
long timestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointOptions checkpointOptions) {
taskExecutorGateway.triggerCheckpoint(
- executionAttemptID,
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime);
+ executionAttemptID, checkpointId, timestamp, checkpointOptions);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 93335bd..21da08f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -668,11 +668,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
public CompletableFuture<String> stopWithSavepoint(
- JobID jobId, String targetDirectory, boolean advanceToEndOfEventTime) {
+ JobID jobId, String targetDirectory, boolean terminate) {
return runDispatcherCommand(
dispatcherGateway ->
dispatcherGateway.stopWithSavepoint(
- jobId, targetDirectory, advanceToEndOfEventTime, rpcTimeout));
+ jobId, targetDirectory, terminate, rpcTimeout));
}
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
index 8d9f710..35bfe85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
@@ -167,13 +167,13 @@ public class SavepointHandlers
HttpResponseStatus.BAD_REQUEST);
}
- final boolean advanceToEndOfEventTime = request.getRequestBody().shouldDrain();
+ final boolean shouldDrain = request.getRequestBody().shouldDrain();
final String targetDirectory =
requestedTargetDirectory != null
? requestedTargetDirectory
: defaultSavepointDir;
return gateway.stopWithSavepoint(
- jobId, targetDirectory, advanceToEndOfEventTime, RpcUtils.INF_TIMEOUT);
+ jobId, targetDirectory, shouldDrain, RpcUtils.INF_TIMEOUT);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1396c6d..c88ac57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -1045,7 +1045,7 @@ public abstract class SchedulerBase implements SchedulerNG {
@Override
public CompletableFuture<String> stopWithSavepoint(
- final String targetDirectory, final boolean advanceToEndOfEventTime) {
+ final String targetDirectory, final boolean terminate) {
mainThreadExecutor.assertRunningInMainThread();
final CheckpointCoordinator checkpointCoordinator =
@@ -1082,7 +1082,7 @@ public abstract class SchedulerBase implements SchedulerNG {
final CompletableFuture<String> savepointFuture =
checkpointCoordinator
- .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory)
+ .triggerSynchronousSavepoint(terminate, targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
final CompletableFuture<JobStatus> terminationFuture =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 06e74e6..e2fd16b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -146,8 +146,7 @@ public interface SchedulerNG {
void declineCheckpoint(DeclineCheckpoint decline);
- CompletableFuture<String> stopWithSavepoint(
- String targetDirectory, boolean advanceToEndOfEventTime);
+ CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate);
// ------------------------------------------------------------------------
// Operator Coordinator related methods
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7e28e6e..bd224e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointType.PostCheckpointAction;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -887,8 +888,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointOptions checkpointOptions) {
log.debug(
"Trigger checkpoint {}@{} for {}.",
checkpointId,
@@ -896,7 +896,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
executionAttemptID);
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
- if (advanceToEndOfEventTime
+ if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
&& !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX.");
@@ -905,8 +905,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
- task.triggerCheckpointBarrier(
- checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
+ task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 8e59cee..829018d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -133,16 +133,13 @@ public interface TaskExecutorGateway extends RpcGateway, TaskExecutorOperatorEve
* @param checkpointID unique id for the checkpoint
* @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
* @param checkpointOptions for performing the checkpoint
- * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
- * MAX_WATERMARK} in the pipeline to fire any registered event-time timers
* @return Future acknowledge if the checkpoint has been successfully triggered
*/
CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointID,
long checkpointTimestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime);
+ CheckpointOptions checkpointOptions);
/**
* Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dbfdba5..d48a750 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1240,14 +1240,11 @@ public class Task
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
* @param checkpointOptions Options for performing this checkpoint.
- * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
- * MAX_WATERMARK} in the pipeline to fire any registered event-time timers.
*/
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
- final CheckpointOptions checkpointOptions,
- final boolean advanceToEndOfEventTime) {
+ final CheckpointOptions checkpointOptions) {
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData =
@@ -1255,8 +1252,7 @@ public class Task
if (executionState == ExecutionState.RUNNING && invokable != null) {
try {
- invokable.triggerCheckpointAsync(
- checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+ invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
} catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting
// down, so we just ignore it.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 7a54ae9..3aea713 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -135,15 +135,14 @@ public interface RestfulGateway extends RpcGateway {
* @param jobId ID of the job for which the savepoint should be triggered.
* @param targetDirectory to which to write the savepoint data or null if the default savepoint
* directory should be used
- * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code
- * MAX_WATERMARK} in the pipeline to fire any registered event-time timers
+ * @param terminate flag indicating if the job should terminate or just suspend
* @param timeout for the rpc call
* @return Future which is completed with the savepoint path once completed
*/
default CompletableFuture<String> stopWithSavepoint(
final JobID jobId,
final String targetDirectory,
- final boolean advanceToEndOfEventTime,
+ final boolean terminate,
@RpcTimeout final Time timeout) {
throw new UnsupportedOperationException();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6379c8b..b2aa6db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2694,8 +2694,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null,
- true,
- false);
+ true);
manuallyTriggeredScheduledExecutor.triggerAll();
try {
onCompletionPromise.get();
@@ -2789,8 +2788,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
jid,
checkpointId,
timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) -> {});
+ checkpointOptions) -> {});
ExecutionVertex vertex2 =
mockExecutionVertex(
attemptID2,
@@ -2798,8 +2796,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
jid,
checkpointId,
timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) -> {});
+ checkpointOptions) -> {});
OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
@@ -2953,8 +2950,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
jid,
checkpointId,
timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) -> {});
+ checkpointOptions) -> {});
ExecutionVertex vertex2 =
mockExecutionVertex(
attemptID2,
@@ -2962,8 +2958,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
jid,
checkpointId,
timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) -> {});
+ checkpointOptions) -> {});
OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 0ae6797..b573669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -583,8 +583,7 @@ public class CheckpointCoordinatorTestingUtils {
jobId,
(long) args[0],
(long) args[1],
- (CheckpointOptions) args[2],
- false);
+ (CheckpointOptions) args[2]);
return null;
})
.when(mock)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 5509e71..db9f6b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -291,7 +291,6 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null,
- false,
false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(onCompletionPromise2.isCompletedExceptionally());
@@ -326,12 +325,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
final ExecutionAttemptID attemptID = new ExecutionAttemptID();
final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
- (executionAttemptID,
- jobId,
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) ->
+ (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
taskManagerCheckpointTriggeredTimes.incrementAndGet();
ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
@@ -365,12 +359,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
final ExecutionAttemptID attemptID = new ExecutionAttemptID();
final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
- (executionAttemptID,
- jobId,
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) ->
+ (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
taskManagerCheckpointTriggeredTimes.incrementAndGet();
ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
@@ -415,12 +404,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
final ExecutionAttemptID attemptID = new ExecutionAttemptID();
final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
- (executionAttemptID,
- jobId,
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) ->
+ (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
taskManagerCheckpointTriggeredTimes.incrementAndGet();
ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
@@ -513,12 +497,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
final ExecutionAttemptID attemptID = new ExecutionAttemptID();
final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0);
final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer =
- (executionAttemptID,
- jobId,
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) ->
+ (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) ->
taskManagerCheckpointTriggeredTimes.incrementAndGet();
ExecutionVertex vertex = mockExecutionVertex(attemptID, checkpointConsumer);
@@ -625,8 +604,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null,
- true,
- false);
+ true);
}
private CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint(
@@ -636,7 +614,6 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null,
- false,
false);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index c380835..d34d798 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -275,14 +275,11 @@ public class CheckpointRequestDeciderTest {
private static CheckpointTriggerRequest savepointRequest(boolean force, boolean periodic) {
return new CheckpointTriggerRequest(
- CheckpointProperties.forSavepoint(force), null, periodic, false);
+ CheckpointProperties.forSavepoint(force), null, periodic);
}
private static CheckpointTriggerRequest checkpointRequest(boolean periodic) {
return new CheckpointTriggerRequest(
- CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
- null,
- periodic,
- false);
+ CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION), null, periodic);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
index 227f262..481559b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -36,6 +36,6 @@ public class CheckpointTypeTest {
public void testOrdinalsAreConstant() {
assertEquals(0, CheckpointType.CHECKPOINT.ordinal());
assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
- assertEquals(2, CheckpointType.SYNC_SAVEPOINT.ordinal());
+ assertEquals(2, CheckpointType.SAVEPOINT_SUSPEND.ordinal());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 339cca9..9635da3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -129,7 +129,7 @@ public class PendingCheckpointTest {
@Test
public void testSyncSavepointCannotBeSubsumed() throws Exception {
// Forced checkpoints cannot be subsumed
- CheckpointProperties forced = CheckpointProperties.forSyncSavepoint(true);
+ CheckpointProperties forced = CheckpointProperties.forSyncSavepoint(true, false);
PendingCheckpoint pending = createPendingCheckpoint(forced);
assertFalse(pending.canBeSubsumed());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 9e8903d..e16c209 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -61,12 +61,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
(ignore1, ignore2) -> {};
private CheckpointConsumer checkpointConsumer =
- (executionAttemptID,
- jobId,
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) -> {};
+ (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> {};
public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) {
this.submitConsumer = submitConsumer;
@@ -147,16 +142,10 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
JobID jobId,
long checkpointId,
long timestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointOptions checkpointOptions) {
checkpointConsumer.accept(
- executionAttemptID,
- jobId,
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime);
+ executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions);
}
@Override
@@ -187,7 +176,6 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
JobID jobId,
long checkpointId,
long timestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime);
+ CheckpointOptions checkpointOptions);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
index 0c698c6..855f3e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
@@ -44,7 +44,7 @@ public class CheckpointSerializationTest {
public void testSuspendingCheckpointBarrierSerialization() throws Exception {
CheckpointOptions suspendSavepointToSerialize =
new CheckpointOptions(
- CheckpointType.SYNC_SAVEPOINT,
+ CheckpointType.SAVEPOINT_SUSPEND,
new CheckpointStorageLocationReference(STORAGE_LOCATION_REF));
testCheckpointBarrierSerialization(suspendSavepointToSerialize);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 842e7ce..6cf6b76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.junit.Test;
@@ -47,7 +49,28 @@ public class EventSerializerTest {
EndOfPartitionEvent.INSTANCE,
EndOfSuperstepEvent.INSTANCE,
new CheckpointBarrier(
- 1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()),
+ 1678L,
+ 4623784L,
+ new CheckpointOptions(
+ CheckpointType.CHECKPOINT,
+ CheckpointStorageLocationReference.getDefault())),
+ new CheckpointBarrier(
+ 1678L,
+ 4623784L,
+ new CheckpointOptions(
+ CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault())),
+ new CheckpointBarrier(
+ 1678L,
+ 4623784L,
+ new CheckpointOptions(
+ CheckpointType.SAVEPOINT_SUSPEND,
+ CheckpointStorageLocationReference.getDefault())),
+ new CheckpointBarrier(
+ 1678L,
+ 4623784L,
+ new CheckpointOptions(
+ CheckpointType.SAVEPOINT_TERMINATE,
+ CheckpointStorageLocationReference.getDefault())),
new TestTaskEvent(Math.random(), 12361231273L),
new CancelCheckpointMarker(287087987329842L)
};
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 2ca5f9d..3f6e6fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -400,10 +400,8 @@ public class TestingJobMasterGateway implements JobMasterGateway {
@Override
public CompletableFuture<String> stopWithSavepoint(
- @Nullable final String targetDirectory,
- final boolean advanceToEndOfEventTime,
- final Time timeout) {
- return stopWithSavepointFunction.apply(targetDirectory, advanceToEndOfEventTime);
+ @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
+ return stopWithSavepointFunction.apply(targetDirectory, terminate);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index c1732ec..969368a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -425,9 +425,7 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
actions.add(checkpointMetaData); // this signals the main thread should do a checkpoint
return CompletableFuture.completedFuture(true);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 26fa24d..08fa081 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -919,12 +919,7 @@ public class DefaultSchedulerTest extends TestLogger {
.getLogicalSlotBuilder()
.setTaskManagerGateway(taskManagerGateway);
taskManagerGateway.setCheckpointConsumer(
- (executionAttemptID,
- jobId,
- checkpointId,
- timestamp,
- checkpointOptions,
- advanceToEndOfEventTime) -> {
+ (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> {
checkpointTriggeredLatch.countDown();
});
return checkpointTriggeredLatch;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 67cbada..ab07a1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -203,8 +203,7 @@ public class TestingSchedulerNG implements SchedulerNG {
}
@Override
- public CompletableFuture<String> stopWithSavepoint(
- String targetDirectory, boolean advanceToEndOfEventTime) {
+ public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
failOperation();
return null;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 9c37d1f..062a324 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -212,8 +212,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
ExecutionAttemptID executionAttemptID,
long checkpointID,
long checkpointTimestamp,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointOptions checkpointOptions) {
return CompletableFuture.completedFuture(Acknowledge.get());
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 94044a0..d769a5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -83,7 +83,7 @@ public class TaskAsyncCallTest extends TestLogger {
/**
* Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpointAsync(CheckpointMetaData,
- * CheckpointOptions, boolean)} was called {@link #numCalls} times.
+ * CheckpointOptions)} was called {@link #numCalls} times.
*/
private static OneShotLatch triggerLatch;
@@ -121,10 +121,7 @@ public class TaskAsyncCallTest extends TestLogger {
for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(
- i,
- 156865867234L,
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
}
triggerLatch.await();
@@ -147,10 +144,7 @@ public class TaskAsyncCallTest extends TestLogger {
for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(
- i,
- 156865867234L,
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
task.notifyCheckpointComplete(i);
}
@@ -260,9 +254,7 @@ public class TaskAsyncCallTest extends TestLogger {
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
lastCheckpointId++;
if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
if (lastCheckpointId == numCalls) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 8c83ff5..7ff7ac8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -264,7 +264,7 @@ public class TestingRestfulGateway implements RestfulGateway {
@Override
public CompletableFuture<String> stopWithSavepoint(
- JobID jobId, String targetDirectory, boolean advanceToEndOfTime, Time timeout) {
+ JobID jobId, String targetDirectory, boolean terminate, Time timeout) {
return stopWithSavepointFunction.apply(jobId, targetDirectory);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 934715b..5532eff 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -220,8 +220,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
task.triggerCheckpointAsync(
new CheckpointMetaData(42, 17),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ CheckpointOptions.forCheckpointWithDefaultLocation())
.get();
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
@@ -342,8 +341,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
task.triggerCheckpointAsync(
new CheckpointMetaData(42, 17),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ CheckpointOptions.forCheckpointWithDefaultLocation())
.get();
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 54ffac2..8181e02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -91,12 +91,9 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
if (!isExternallyInducedSource) {
- return super.triggerCheckpointAsync(
- checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+ return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
} else {
return CompletableFuture.completedFuture(isRunning());
}
@@ -127,7 +124,7 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
final CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(checkpointId, timestamp);
- super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false);
+ super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
}
// ---------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index db1fba9..42b1ccf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -111,7 +111,7 @@ public class SourceStreamTask<
try {
SourceStreamTask.super
.triggerCheckpointAsync(
- checkpointMetaData, checkpointOptions, false)
+ checkpointMetaData, checkpointOptions)
.get();
} catch (RuntimeException e) {
throw e;
@@ -217,12 +217,9 @@ public class SourceStreamTask<
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
if (!externallyInducedCheckpoints) {
- return super.triggerCheckpointAsync(
- checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+ return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
} else {
// we do not trigger checkpoints here, we simply state whether we can trigger them
synchronized (lock) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 32b8612..573bd31 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -850,9 +850,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
mainMailboxExecutor.execute(
@@ -864,11 +862,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
System.currentTimeMillis()
- checkpointMetaData.getTimestamp());
try {
- result.complete(
- triggerCheckpoint(
- checkpointMetaData,
- checkpointOptions,
- advanceToEndOfEventTime));
+ result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions));
} catch (Exception ex) {
// Report the failure both via the Future result but also to the mailbox
result.completeExceptionally(ex);
@@ -882,9 +876,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
}
private boolean triggerCheckpoint(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime)
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
throws Exception {
try {
// No alignment if we inject a checkpoint
@@ -895,11 +887,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
checkpointMetaData.getCheckpointId(), checkpointOptions);
boolean success =
- performCheckpoint(
- checkpointMetaData,
- checkpointOptions,
- checkpointMetrics,
- advanceToEndOfEventTime);
+ performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
if (!success) {
declineCheckpoint(checkpointMetaData.getCheckpointId());
}
@@ -945,8 +933,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
throws IOException {
try {
- if (performCheckpoint(
- checkpointMetaData, checkpointOptions, checkpointMetrics, false)) {
+ if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics)) {
if (isSynchronousSavepointId(checkpointMetaData.getCheckpointId())) {
runSynchronousSavepointMailboxLoop();
}
@@ -977,8 +964,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
- CheckpointMetrics checkpointMetrics,
- boolean advanceToEndOfTime)
+ CheckpointMetrics checkpointMetrics)
throws Exception {
LOG.debug(
@@ -993,7 +979,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
if (checkpointOptions.getCheckpointType().isSynchronous()) {
setSynchronousSavepointId(checkpointMetaData.getCheckpointId());
- if (advanceToEndOfTime) {
+ if (checkpointOptions.getCheckpointType().shouldAdvanceToEndOfTime()) {
advanceToEndOfEventTime();
}
} else if (activeSyncSavepointId != null
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 904654d..9e15fe2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -283,8 +283,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
new CheckpointMetaData(
0, System.currentTimeMillis()),
CheckpointOptions
- .forCheckpointWithDefaultLocation(),
- false)
+ .forCheckpointWithDefaultLocation())
.get()) {
LifecycleTrackingStreamSource.runFinish.trigger();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 5af7bc9..19184ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -526,7 +526,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
new CheckpointMetaData(checkpointId, checkpointTimestamp);
task.triggerCheckpointAsync(
- checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+ checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
taskStateManagerMock.getWaitForReportLatch().await();
@@ -569,8 +569,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
restoredTask
.triggerCheckpointAsync(
new CheckpointMetaData(checkpointId, checkpointTimestamp),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ CheckpointOptions.forCheckpointWithDefaultLocation())
.get();
restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
index 33ad7d0..6e939dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
@@ -1108,9 +1108,7 @@ public abstract class CheckpointBarrierAlignerTestBase {
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
throw new UnsupportedOperationException("should never be called");
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
index 180ce14..44dbe29 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
@@ -49,9 +49,7 @@ class CheckpointSequenceValidator extends AbstractInvokable {
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
throw new UnsupportedOperationException("should never be called");
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index d935af9..1c9c270 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -607,9 +607,7 @@ public class OneInputStreamTaskTest extends TestLogger {
streamTask
.triggerCheckpointAsync(
- checkpointMetaData,
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())
.get();
// since no state was set, there shouldn't be restore calls
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
index ae3d14d..14c25d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
@@ -285,7 +285,7 @@ public class RestoreStreamTaskTest extends TestLogger {
testHarness.taskStateManager.setWaitForReportLatch(new OneShotLatch());
streamTask.triggerCheckpointAsync(
- checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+ checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
testHarness.taskStateManager.getWaitForReportLatch().await();
long reportedCheckpointId = testHarness.taskStateManager.getReportedCheckpointId();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
index 303cf37..7308e26 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
@@ -89,8 +89,7 @@ public class SourceExternalCheckpointTriggerTest {
sourceTask
.triggerCheckpointAsync(
new CheckpointMetaData(32, 829),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ CheckpointOptions.forCheckpointWithDefaultLocation())
.get());
// step by step let the source thread emit elements
@@ -111,8 +110,7 @@ public class SourceExternalCheckpointTriggerTest {
sourceTask
.triggerCheckpointAsync(
new CheckpointMetaData(34, 900),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ CheckpointOptions.forCheckpointWithDefaultLocation())
.get());
sync.trigger();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 123fcca..b5edd92 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -98,7 +98,7 @@ public class SourceOperatorStreamTaskTest {
final CheckpointOptions checkpointOptions =
new CheckpointOptions(
- CheckpointType.SYNC_SAVEPOINT,
+ CheckpointType.SAVEPOINT_TERMINATE,
CheckpointStorageLocationReference.getDefault());
triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions);
@@ -202,7 +202,7 @@ public class SourceOperatorStreamTaskTest {
Future<Boolean> checkpointFuture =
testHarness
.getStreamTask()
- .triggerCheckpointAsync(checkpointMetaData, checkpointOptions, true);
+ .triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
// Wait until the checkpoint finishes.
// We have to mark the source reader as available here, otherwise the runMailboxStep() call
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index ebafdbf..8ba4739 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -76,7 +76,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT_SUSPEND;
import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -115,8 +115,7 @@ public class SourceStreamTaskTest {
Future<Boolean> triggerFuture =
harness.streamTask.triggerCheckpointAsync(
new CheckpointMetaData(1, 1),
- new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
- false);
+ new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()));
while (!triggerFuture.isDone()) {
harness.streamTask.runMailboxStep();
}
@@ -175,8 +174,7 @@ public class SourceStreamTaskTest {
Future<Boolean> triggerFuture =
harness.streamTask.triggerCheckpointAsync(
new CheckpointMetaData(1L, System.currentTimeMillis()),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ CheckpointOptions.forCheckpointWithDefaultLocation());
assertFalse(triggerFuture.isDone());
Thread.sleep(sleepTime);
@@ -590,8 +588,7 @@ public class SourceStreamTaskTest {
Future<Boolean> triggerFuture =
harness.streamTask.triggerCheckpointAsync(
new CheckpointMetaData(checkpointId, 1),
- new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
- false);
+ new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()));
while (!triggerFuture.isDone()) {
harness.streamTask.runMailboxStep();
}
@@ -711,8 +708,7 @@ public class SourceStreamTaskTest {
try {
sourceTask.triggerCheckpointAsync(
new CheckpointMetaData(currentCheckpointId, 0L),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ CheckpointOptions.forCheckpointWithDefaultLocation());
} catch (RejectedExecutionException e) {
// We are late with a checkpoint, the mailbox is already closed.
return false;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
index 43df989..9aa0fcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
@@ -74,7 +74,7 @@ public class SourceTaskTerminationTest extends TestLogger {
stopWithSavepointStreamTaskTestHelper(false);
}
- private void stopWithSavepointStreamTaskTestHelper(final boolean withMaxWatermark)
+ private void stopWithSavepointStreamTaskTestHelper(final boolean shouldTerminate)
throws Exception {
final long syncSavepointId = 34L;
@@ -90,8 +90,7 @@ public class SourceTaskTerminationTest extends TestLogger {
srcTask.triggerCheckpointAsync(
new CheckpointMetaData(31L, 900),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ CheckpointOptions.forCheckpointWithDefaultLocation())
.get();
verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
@@ -101,12 +100,13 @@ public class SourceTaskTerminationTest extends TestLogger {
srcTask.triggerCheckpointAsync(
new CheckpointMetaData(syncSavepointId, 900),
new CheckpointOptions(
- CheckpointType.SYNC_SAVEPOINT,
- CheckpointStorageLocationReference.getDefault()),
- withMaxWatermark)
+ shouldTerminate
+ ? CheckpointType.SAVEPOINT_TERMINATE
+ : CheckpointType.SAVEPOINT_SUSPEND,
+ CheckpointStorageLocationReference.getDefault()))
.get();
- if (withMaxWatermark) {
+ if (shouldTerminate) {
// if we are in TERMINATE mode, we expect the source task
// to emit MAX_WM before the SYNC_SAVEPOINT barrier.
verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
index 1d7f33e..17b604d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
@@ -74,8 +74,7 @@ public class StreamTaskExecutionDecorationTest {
new CheckpointMetaData(1, 2),
new CheckpointOptions(
CheckpointType.CHECKPOINT,
- new CheckpointStorageLocationReference(new byte[] {1})),
- false);
+ new CheckpointStorageLocationReference(new byte[] {1})));
Assert.assertTrue("mailbox is empty", mailbox.hasMail());
Assert.assertFalse("execution decorator was called preliminary", decorator.wasCalled());
mailbox.drain()
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 12f3676..235cdef 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -199,8 +199,7 @@ public class StreamTaskTerminationTest extends TestLogger {
task.triggerCheckpointBarrier(
checkpointId,
checkpointTimestamp,
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ CheckpointOptions.forCheckpointWithDefaultLocation());
// wait until the task has completed execution
taskRun.get();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bfd7d54..904097e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -154,7 +154,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT_SUSPEND;
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
@@ -237,7 +237,7 @@ public class StreamTaskTest extends TestLogger {
try {
harness.streamTask.triggerCheckpointOnBarrier(
new CheckpointMetaData(checkpointId, checkpointId),
- new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+ new CheckpointOptions(SAVEPOINT_SUSPEND, getDefault()),
new CheckpointMetrics());
} catch (IOException e) {
fail(e.getMessage());
@@ -649,8 +649,7 @@ public class StreamTaskTest extends TestLogger {
streamTask.triggerCheckpointAsync(
new CheckpointMetaData(42L, 1L),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ CheckpointOptions.forCheckpointWithDefaultLocation());
try {
task.waitForTaskCompletion(false);
@@ -701,8 +700,7 @@ public class StreamTaskTest extends TestLogger {
streamTask.triggerCheckpointAsync(
new CheckpointMetaData(42L, 1L),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ CheckpointOptions.forCheckpointWithDefaultLocation());
final Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException();
assertThat(uncaughtException, is(failingCause));
@@ -752,8 +750,7 @@ public class StreamTaskTest extends TestLogger {
streamTask
.triggerCheckpointAsync(
new CheckpointMetaData(42L, 1L),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ CheckpointOptions.forCheckpointWithDefaultLocation())
.get();
// wait for the completion of the async task
@@ -868,8 +865,7 @@ public class StreamTaskTest extends TestLogger {
final long checkpointId = 42L;
streamTask.triggerCheckpointAsync(
new CheckpointMetaData(checkpointId, 1L),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ CheckpointOptions.forCheckpointWithDefaultLocation());
acknowledgeCheckpointLatch.await();
@@ -958,8 +954,7 @@ public class StreamTaskTest extends TestLogger {
final long checkpointId = 42L;
task.streamTask.triggerCheckpointAsync(
new CheckpointMetaData(checkpointId, 1L),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ CheckpointOptions.forCheckpointWithDefaultLocation());
rawKeyedStateHandleFuture.awaitRun();
@@ -1044,8 +1039,7 @@ public class StreamTaskTest extends TestLogger {
task.streamTask.triggerCheckpointAsync(
new CheckpointMetaData(42L, 1L),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ CheckpointOptions.forCheckpointWithDefaultLocation());
checkpointCompletedLatch.await(30, TimeUnit.SECONDS);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index ba122d5..9b5452f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -108,9 +108,8 @@ public class SynchronousCheckpointITCase {
42,
156865867234L,
new CheckpointOptions(
- CheckpointType.SYNC_SAVEPOINT,
- CheckpointStorageLocationReference.getDefault()),
- true);
+ CheckpointType.SAVEPOINT_SUSPEND,
+ CheckpointStorageLocationReference.getDefault()));
assertThat(eventQueue.take(), is(Event.PRE_TRIGGER_CHECKPOINT));
assertThat(eventQueue.take(), is(Event.POST_TRIGGER_CHECKPOINT));
@@ -153,14 +152,11 @@ public class SynchronousCheckpointITCase {
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
try {
eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT);
Future<Boolean> result =
- super.triggerCheckpointAsync(
- checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+ super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
eventQueue.put(Event.POST_TRIGGER_CHECKPOINT);
return result;
} catch (InterruptedException e) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 5d75d38..0a4949c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -111,9 +111,8 @@ public class SynchronousCheckpointTest {
streamTaskUnderTest.triggerCheckpointAsync(
new CheckpointMetaData(42, System.currentTimeMillis()),
new CheckpointOptions(
- CheckpointType.SYNC_SAVEPOINT,
- CheckpointStorageLocationReference.getDefault()),
- false);
+ CheckpointType.SAVEPOINT_SUSPEND,
+ CheckpointStorageLocationReference.getDefault()));
waitForSyncSavepointIdToBeSet(streamTaskUnderTest);
}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 53a0612..f72d53a 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -351,19 +351,16 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
@Override
public Future<Boolean> triggerCheckpointAsync(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- boolean advanceToEndOfEventTime) {
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
final long checkpointId = checkpointMetaData.getCheckpointId();
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
- if (checkpointType == CheckpointType.SYNC_SAVEPOINT) {
+ if (checkpointType == CheckpointType.SAVEPOINT_SUSPEND) {
synchronousSavepointId = checkpointId;
syncSavepointId.compareAndSet(-1, synchronousSavepointId);
}
- return super.triggerCheckpointAsync(
- checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
+ return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
}
@Override
@@ -435,8 +432,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
@Override
public Future<Boolean> triggerCheckpointAsync(
final CheckpointMetaData checkpointMetaData,
- final CheckpointOptions checkpointOptions,
- final boolean advanceToEndOfEventTime) {
+ final CheckpointOptions checkpointOptions) {
final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
if (taskIndex == 0) {
checkpointsToWaitFor.countDown();
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index ecb9d5e..cd79163 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -239,8 +239,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
@Override
public Future<Boolean> triggerCheckpointAsync(
final CheckpointMetaData checkpointMetaData,
- final CheckpointOptions checkpointOptions,
- final boolean advanceToEndOfEventTime) {
+ final CheckpointOptions checkpointOptions) {
final TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
checkpointStateHandles.putSubtaskStateByOperatorID(
OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
index dd282e7..95886dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
@@ -183,9 +183,7 @@ public class StatefulOperatorChainedTaskTest {
while (!streamTask
.triggerCheckpointAsync(
- checkpointMetaData,
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false)
+ checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())
.get()) {}
testHarness.getTaskStateManager().getWaitForReportLatch().await();