You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/25 22:21:22 UTC
[flink] 01/02: [FLINK-22684][runtime] Added ability to ignore
in-flight data during the recovery
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 11f59bf60a0bab9f921cce9b43f75e57fb099a47
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed May 19 17:32:09 2021 +0200
[FLINK-22684][runtime] Added ability to ignore in-flight data during the recovery
---
.../execution_checkpointing_configuration.html | 6 +
.../jobmanager/JMXJobManagerMetricTest.java | 1 +
.../runtime/checkpoint/CheckpointCoordinator.java | 33 +++-
.../runtime/checkpoint/OperatorSubtaskState.java | 12 ++
.../tasks/CheckpointCoordinatorConfiguration.java | 34 +++-
.../CheckpointCoordinatorMasterHooksTest.java | 1 +
.../CheckpointCoordinatorRestoringTest.java | 140 +++++++++++++--
.../CheckpointSettingsSerializableTest.java | 1 +
.../checkpoint/CheckpointStatsTrackerTest.java | 1 +
.../ExecutionGraphCheckpointCoordinatorTest.java | 1 +
.../FailoverStrategyCheckpointCoordinatorTest.java | 1 +
.../checkpoint/OperatorSubtaskStateTest.java | 54 ++++++
.../executiongraph/ArchivedExecutionGraphTest.java | 1 +
.../DefaultExecutionGraphDeploymentTest.java | 1 +
.../flink/runtime/jobgraph/JobGraphTest.java | 1 +
.../tasks/JobCheckpointingSettingsTest.java | 1 +
.../runtime/scheduler/SchedulerTestingUtils.java | 1 +
.../api/environment/CheckpointConfig.java | 30 ++++
.../environment/ExecutionCheckpointingOptions.java | 17 ++
.../api/graph/StreamingJobGraphGenerator.java | 2 +
.../JobMasterStopWithSavepointITCase.java | 1 +
.../jobmaster/JobMasterTriggerSavepointITCase.java | 1 +
.../checkpointing/IgnoreInFlightDataITCase.java | 189 +++++++++++++++++++++
23 files changed, 508 insertions(+), 22 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
index c9786ab..88dfc04 100644
--- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
@@ -21,6 +21,12 @@
<td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint should [...]
</tr>
<tr>
+ <td><h5>execution.checkpointing.id-of-ignored-in-flight-data</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Long</td>
+ <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td>
+ </tr>
+ <tr>
<td><h5>execution.checkpointing.interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 4ae173a..4044bf5 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -101,6 +101,7 @@ public class JMXJobManagerMetricTest extends TestLogger {
true,
false,
false,
+ 0,
0),
null);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b31dd8f..c5c1e7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -206,6 +206,9 @@ public class CheckpointCoordinator {
private boolean isPreferCheckpointForRecovery;
+ /** Id of checkpoint for which in-flight data should be ignored on recovery. */
+ private final long checkpointIdOfIgnoredInFlightData;
+
private final CheckpointFailureManager failureManager;
private final Clock clock;
@@ -309,6 +312,7 @@ public class CheckpointCoordinator {
this.isExactlyOnceMode = chkConfig.isExactlyOnce();
this.unalignedCheckpointsEnabled = chkConfig.isUnalignedCheckpointsEnabled();
this.alignmentTimeout = chkConfig.getAlignmentTimeout();
+ this.checkpointIdOfIgnoredInFlightData = chkConfig.getCheckpointIdOfIgnoredInFlightData();
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
@@ -1553,7 +1557,7 @@ public class CheckpointCoordinator {
LOG.info("Restoring job {} from {}.", job, latest);
// re-assign the task states
- final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
+ final Map<OperatorID, OperatorState> operatorStates = extractOperatorStates(latest);
StateAssignmentOperation stateAssignmentOperation =
new StateAssignmentOperation(
@@ -1595,6 +1599,33 @@ public class CheckpointCoordinator {
}
}
+ private Map<OperatorID, OperatorState> extractOperatorStates(CompletedCheckpoint checkpoint) {
+ Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates();
+
+ if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData) {
+ // rewrite the operator state with empty in-flight data.
+ for (OperatorState operatorState : operatorStates.values()) {
+ for (Map.Entry<Integer, OperatorSubtaskState> subtaskStateEntry :
+ operatorState.getSubtaskStates().entrySet()) {
+
+ OperatorSubtaskState subtaskState = subtaskStateEntry.getValue();
+ if (!subtaskState.getResultSubpartitionState().isEmpty()
+ || !subtaskState.getInputChannelState().isEmpty()) {
+ operatorState.putState(
+ subtaskStateEntry.getKey(),
+ subtaskState
+ .toBuilder()
+ .setResultSubpartitionState(StateObjectCollection.empty())
+ .setInputChannelState(StateObjectCollection.empty())
+ .build());
+ }
+ }
+ }
+ }
+
+ return operatorStates;
+ }
+
/**
* Restore the state with given savepoint.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index b6a9f64..9d0739a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -307,6 +307,18 @@ public class OperatorSubtaskState implements CompositeStateHandle {
|| resultSubpartitionState.hasState();
}
+ public Builder toBuilder() {
+ return builder()
+ .setManagedKeyedState(managedKeyedState)
+ .setManagedOperatorState(managedOperatorState)
+ .setRawOperatorState(rawOperatorState)
+ .setRawKeyedState(rawKeyedState)
+ .setInputChannelState(inputChannelState)
+ .setResultSubpartitionState(resultSubpartitionState)
+ .setInputRescalingDescriptor(inputRescalingDescriptor)
+ .setOutputRescalingDescriptor(outputRescalingDescriptor);
+ }
+
public static Builder builder() {
return new Builder();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index f679072..0fd6f87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -64,6 +64,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
private final long alignmentTimeout;
+ private final long checkpointIdOfIgnoredInFlightData;
+
/** @deprecated use {@link #builder()}. */
@Deprecated
@VisibleForTesting
@@ -76,7 +78,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
boolean isExactlyOnce,
boolean isUnalignedCheckpoint,
boolean isPreferCheckpointForRecovery,
- int tolerableCpFailureNumber) {
+ int tolerableCpFailureNumber,
+ long checkpointIdOfIgnoredInFlightData) {
this(
checkpointInterval,
checkpointTimeout,
@@ -87,7 +90,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
isPreferCheckpointForRecovery,
tolerableCpFailureNumber,
isUnalignedCheckpoint,
- 0);
+ 0,
+ checkpointIdOfIgnoredInFlightData);
}
private CheckpointCoordinatorConfiguration(
@@ -100,7 +104,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
boolean isPreferCheckpointForRecovery,
int tolerableCpFailureNumber,
boolean isUnalignedCheckpointsEnabled,
- long alignmentTimeout) {
+ long alignmentTimeout,
+ long checkpointIdOfIgnoredInFlightData) {
// sanity checks
if (checkpointInterval < MINIMAL_CHECKPOINT_TIME
@@ -124,6 +129,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber;
this.isUnalignedCheckpointsEnabled = isUnalignedCheckpointsEnabled;
this.alignmentTimeout = alignmentTimeout;
+ this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData;
}
public long getCheckpointInterval() {
@@ -166,6 +172,10 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
return alignmentTimeout;
}
+ public long getCheckpointIdOfIgnoredInFlightData() {
+ return checkpointIdOfIgnoredInFlightData;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -183,7 +193,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
&& isUnalignedCheckpointsEnabled == that.isUnalignedCheckpointsEnabled
&& checkpointRetentionPolicy == that.checkpointRetentionPolicy
&& isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery
- && tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber;
+ && tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber
+ && checkpointIdOfIgnoredInFlightData == that.checkpointIdOfIgnoredInFlightData;
}
@Override
@@ -197,7 +208,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
isExactlyOnce,
isUnalignedCheckpointsEnabled,
isPreferCheckpointForRecovery,
- tolerableCheckpointFailureNumber);
+ tolerableCheckpointFailureNumber,
+ checkpointIdOfIgnoredInFlightData);
}
@Override
@@ -221,6 +233,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
+ isPreferCheckpointForRecovery
+ ", tolerableCheckpointFailureNumber="
+ tolerableCheckpointFailureNumber
+ + ", checkpointIdOfIgnoredInFlightData="
+ + checkpointIdOfIgnoredInFlightData
+ '}';
}
@@ -241,6 +255,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
private int tolerableCheckpointFailureNumber;
private boolean isUnalignedCheckpointsEnabled;
private long alignmentTimeout = 0;
+ private long checkpointIdOfIgnoredInFlightData;
public CheckpointCoordinatorConfiguration build() {
return new CheckpointCoordinatorConfiguration(
@@ -253,7 +268,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
isPreferCheckpointForRecovery,
tolerableCheckpointFailureNumber,
isUnalignedCheckpointsEnabled,
- alignmentTimeout);
+ alignmentTimeout,
+ checkpointIdOfIgnoredInFlightData);
}
public CheckpointCoordinatorConfigurationBuilder setCheckpointInterval(
@@ -314,5 +330,11 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
this.alignmentTimeout = alignmentTimeout;
return this;
}
+
+ public CheckpointCoordinatorConfigurationBuilder setCheckpointIdOfIgnoredInFlightData(
+ long checkpointIdOfIgnoredInFlightData) {
+ this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData;
+ return this;
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 0dd7104..86337f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -474,6 +474,7 @@ public class CheckpointCoordinatorMasterHooksTest {
true,
false,
false,
+ 0,
0);
Executor executor = Executors.directExecutor();
return new CheckpointCoordinator(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 4bc02b6..6559166 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -59,11 +59,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static java.util.Collections.singletonList;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.compareKeyedState;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.comparePartitionableState;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle;
@@ -71,6 +73,8 @@ import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUt
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockSubtaskState;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.verifyStateRestore;
+import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
+import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -283,8 +287,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
long checkpointId = checkpointIDCounter.getLast();
KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 0);
- List<SerializableObject> testStates =
- Collections.singletonList(new SerializableObject());
+ List<SerializableObject> testStates = singletonList(new SerializableObject());
KeyedStateHandle serializedKeyGroupStates =
generateKeyGroupState(keyGroupRange, testStates);
@@ -319,7 +322,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
KeyGroupRange keyGroupRangeForSavepoint = KeyGroupRange.of(1, 1);
List<SerializableObject> testStatesForSavepoint =
- Collections.singletonList(new SerializableObject());
+ singletonList(new SerializableObject());
KeyedStateHandle serializedKeyGroupStatesForSavepoint =
generateKeyGroupState(keyGroupRangeForSavepoint, testStatesForSavepoint);
@@ -441,6 +444,9 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
.setManagedOperatorState(opStateBackend)
.setManagedKeyedState(keyedStateBackend)
.setRawKeyedState(keyedStateRaw)
+ .setInputChannelState(
+ StateObjectCollection.singleton(
+ createNewInputChannelStateHandle(3, new Random())))
.build();
TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(
@@ -474,10 +480,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
generatePartitionableStateHandle(jobVertexID2, index, 2, 8, false);
OperatorStateHandle opStateRaw =
generatePartitionableStateHandle(jobVertexID2, index, 2, 8, true);
- expectedOpStatesBackend.add(
- new ChainedStateHandle<>(Collections.singletonList(opStateBackend)));
- expectedOpStatesRaw.add(
- new ChainedStateHandle<>(Collections.singletonList(opStateRaw)));
+ expectedOpStatesBackend.add(new ChainedStateHandle<>(singletonList(opStateBackend)));
+ expectedOpStatesRaw.add(new ChainedStateHandle<>(singletonList(opStateRaw)));
OperatorSubtaskState operatorSubtaskState =
OperatorSubtaskState.builder()
@@ -574,11 +578,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
if (idx == headOpIndex) {
Collection<KeyedStateHandle> keyedStateBackend = opState.getManagedKeyedState();
Collection<KeyedStateHandle> keyGroupStateRaw = opState.getRawKeyedState();
- compareKeyedState(
- Collections.singletonList(originalKeyedStateBackend),
- keyedStateBackend);
- compareKeyedState(
- Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
+ compareKeyedState(singletonList(originalKeyedStateBackend), keyedStateBackend);
+ compareKeyedState(singletonList(originalKeyedStateRaw), keyGroupStateRaw);
}
}
actualOpStatesBackend.add(allParallelManagedOpStates);
@@ -1032,13 +1033,122 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
Collection<KeyedStateHandle> keyedStateBackend = headOpState.getManagedKeyedState();
Collection<KeyedStateHandle> keyGroupStateRaw = headOpState.getRawKeyedState();
- compareKeyedState(
- Collections.singletonList(originalKeyedStateBackend), keyedStateBackend);
- compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
+ compareKeyedState(singletonList(originalKeyedStateBackend), keyedStateBackend);
+ compareKeyedState(singletonList(originalKeyedStateRaw), keyGroupStateRaw);
}
comparePartitionableState(
expectedManagedOperatorStates.get(0), actualManagedOperatorStates);
comparePartitionableState(expectedRawOperatorStates.get(0), actualRawOperatorStates);
}
+
+ @Test
+ public void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
+ // given: Operator with not empty states.
+ final JobVertexID jobVertexID = new JobVertexID();
+ int parallelism1 = 3;
+ int maxParallelism1 = 42;
+
+ CompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
+
+ final ExecutionGraph graph =
+ new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID, parallelism1, maxParallelism1)
+ .build();
+
+ final ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord =
+ new CheckpointCoordinatorBuilder()
+ .setExecutionGraph(graph)
+ .setCompletedCheckpointStore(completedCheckpointStore)
+ .setCheckpointCoordinatorConfiguration(
+ new CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointIdOfIgnoredInFlightData(1)
+ .build())
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build();
+
+ // trigger the checkpoint
+ coord.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ assertEquals(1, coord.getPendingCheckpoints().size());
+ long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+
+ List<KeyGroupRange> keyGroupPartitions1 =
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+
+ Random random = new Random();
+ // fill the states and complete the checkpoint.
+ for (int index = 0; index < jobVertex.getParallelism(); index++) {
+ OperatorSubtaskState operatorSubtaskState =
+ OperatorSubtaskState.builder()
+ .setManagedOperatorState(
+ generatePartitionableStateHandle(
+ jobVertexID, index, 2, 8, false))
+ .setRawOperatorState(
+ generatePartitionableStateHandle(
+ jobVertexID, index, 2, 8, true))
+ .setManagedKeyedState(
+ generateKeyGroupState(
+ jobVertexID, keyGroupPartitions1.get(index), false))
+ .setRawKeyedState(
+ generateKeyGroupState(
+ jobVertexID, keyGroupPartitions1.get(index), true))
+ .setInputChannelState(
+ StateObjectCollection.singleton(
+ createNewInputChannelStateHandle(3, random)))
+ .setResultSubpartitionState(
+ StateObjectCollection.singleton(
+ createNewResultSubpartitionStateHandle(3, random)))
+ .build();
+ TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
+ taskOperatorSubtaskStates.putSubtaskStateByOperatorID(
+ OperatorID.fromJobVertexID(jobVertexID), operatorSubtaskState);
+
+ AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(
+ graph.getJobID(),
+ jobVertex
+ .getTaskVertices()[index]
+ .getCurrentExecutionAttempt()
+ .getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ taskOperatorSubtaskStates);
+
+ coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
+ }
+
+ assertEquals(1, coord.getSuccessfulCheckpoints().size());
+
+ // when: Restore latest checkpoint without in-flight data.
+ Set<ExecutionJobVertex> tasks = new HashSet<>();
+ tasks.add(jobVertex);
+ assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+
+ // then: All states should be restored successfully except InputChannel and
+ // ResultSubpartition which should be ignored.
+ verifyStateRestore(jobVertexID, jobVertex, keyGroupPartitions1);
+ for (int i = 0; i < jobVertex.getParallelism(); i++) {
+ JobManagerTaskRestore taskRestore =
+ jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
+ Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
+ TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
+
+ OperatorSubtaskState operatorState =
+ stateSnapshot.getSubtaskStateByOperatorID(
+ OperatorID.fromJobVertexID(jobVertexID));
+
+ assertTrue(operatorState.getInputChannelState().isEmpty());
+ assertTrue(operatorState.getResultSubpartitionState().isEmpty());
+
+ assertFalse(operatorState.getRawOperatorState().isEmpty());
+ assertFalse(operatorState.getManagedOperatorState().isEmpty());
+ assertFalse(operatorState.getRawKeyedState().isEmpty());
+ assertFalse(operatorState.getManagedOperatorState().isEmpty());
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index d0de6d1..9950f2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -86,6 +86,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
true,
false,
false,
+ 0,
0),
new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
new SerializedValue<CheckpointStorage>(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 3c134cb..927b809 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -60,6 +60,7 @@ public class CheckpointStatsTrackerTest {
false,
false,
false,
+ 0,
0),
null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index d53be67..50c7dd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -145,6 +145,7 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
true,
false,
false,
+ 0,
0);
final JobCheckpointingSettings checkpointingSettings =
new JobCheckpointingSettings(chkConfig, null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
index 9396ef0..ee71761 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
@@ -70,6 +70,7 @@ public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger {
true,
false,
false,
+ 0,
0);
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinator(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
index 274e7fd..a764c76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
@@ -19,16 +19,28 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Random;
+
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.builder.EqualsBuilder.reflectionEquals;
+import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState;
+import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle;
+import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
+import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/** {@link OperatorSubtaskState} test. */
public class OperatorSubtaskStateTest {
@@ -50,6 +62,48 @@ public class OperatorSubtaskStateTest {
.discardState();
}
+ @Test
+ public void testToBuilderCorrectness() throws IOException {
+ // given: Initialized operator subtask state.
+ JobVertexID jobVertexID = new JobVertexID();
+ int index = 0;
+ Random random = new Random();
+
+ OperatorSubtaskState operatorSubtaskState =
+ OperatorSubtaskState.builder()
+ .setManagedOperatorState(
+ generatePartitionableStateHandle(jobVertexID, index, 2, 8, false))
+ .setRawOperatorState(
+ generatePartitionableStateHandle(jobVertexID, index, 2, 8, true))
+ .setManagedKeyedState(
+ generateKeyGroupState(jobVertexID, new KeyGroupRange(0, 11), false))
+ .setRawKeyedState(
+ generateKeyGroupState(jobVertexID, new KeyGroupRange(0, 9), true))
+ .setInputChannelState(
+ StateObjectCollection.singleton(
+ createNewInputChannelStateHandle(3, random)))
+ .setResultSubpartitionState(
+ StateObjectCollection.singleton(
+ createNewResultSubpartitionStateHandle(3, random)))
+ .setInputRescalingDescriptor(
+ new InflightDataRescalingDescriptor(
+ new int[1],
+ new RescaleMappings[0],
+ Collections.singleton(1)))
+ .setOutputRescalingDescriptor(
+ new InflightDataRescalingDescriptor(
+ new int[1],
+ new RescaleMappings[0],
+ Collections.singleton(2)))
+ .build();
+
+ // when: Copy the operator subtask state.
+ OperatorSubtaskState operatorSubtaskStateCopy = operatorSubtaskState.toBuilder().build();
+
+ // then: It should be equal to original one.
+ assertTrue(reflectionEquals(operatorSubtaskState, operatorSubtaskStateCopy));
+ }
+
private ResultSubpartitionStateHandle buildSubpartitionHandle(
StreamStateHandle delegate, int subPartitionIdx1) {
return new ResultSubpartitionStateHandle(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 3292579..073d214 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -101,6 +101,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
true,
false,
false,
+ 0,
0);
JobCheckpointingSettings checkpointingSettings =
new JobCheckpointingSettings(chkConfig, null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index ec722d8..3bbd94f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -659,6 +659,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
false,
false,
false,
+ 0,
0),
null));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index d779c99..dd4424b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -398,6 +398,7 @@ public class JobGraphTest extends TestLogger {
true,
false,
false,
+ 0,
0);
return new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 3677785..8a7a738 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -45,6 +45,7 @@ public class JobCheckpointingSettingsTest {
false,
false,
false,
+ 0,
0),
new SerializedValue<>(new MemoryStateBackend()));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index ac5b1cd..80e8652c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -162,6 +162,7 @@ public class SchedulerTestingUtils {
false,
false,
false,
+ 0,
0);
SerializedValue<StateBackend> serializedStateBackend = null;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 76abc50..02ee15b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -91,6 +91,9 @@ public class CheckpointConfig implements java.io.Serializable {
/** Flag to enable unaligned checkpoints. */
private boolean unalignedCheckpointsEnabled;
+ /** Id of checkpoint for which in-flight data should be ignored on recovery. */
+ private long checkpointIdOfIgnoredInFlightData;
+
private Duration alignmentTimeout =
ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue();
@@ -148,6 +151,8 @@ public class CheckpointConfig implements java.io.Serializable {
this.forceCheckpointing = checkpointConfig.forceCheckpointing;
this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
this.storage = checkpointConfig.getCheckpointStorage();
+ this.checkpointIdOfIgnoredInFlightData =
+ checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
}
public CheckpointConfig() {}
@@ -668,6 +673,28 @@ public class CheckpointConfig implements java.io.Serializable {
return this.storage;
}
+ /**
+ * Setup the checkpoint id for which the in-flight data will be ignored for all operators in
+ * case of the recovery from this checkpoint.
+ *
+ * @param checkpointIdOfIgnoredInFlightData Checkpoint id for which in-flight data should be
+ * ignored.
+ * @see #setCheckpointIdOfIgnoredInFlightData
+ */
+ @PublicEvolving
+ public void setCheckpointIdOfIgnoredInFlightData(long checkpointIdOfIgnoredInFlightData) {
+ this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData;
+ }
+
+ /**
+ * @return Checkpoint id for which in-flight data should be ignored.
+ * @see #setCheckpointIdOfIgnoredInFlightData
+ */
+ @PublicEvolving
+ public long getCheckpointIdOfIgnoredInFlightData() {
+ return checkpointIdOfIgnoredInFlightData;
+ }
+
/** Cleanup behaviour for externalized checkpoints when the job is cancelled. */
@PublicEvolving
public enum ExternalizedCheckpointCleanup {
@@ -750,6 +777,9 @@ public class CheckpointConfig implements java.io.Serializable {
.getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED)
.ifPresent(this::enableUnalignedCheckpoints);
configuration
+ .getOptional(ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA)
+ .ifPresent(this::setCheckpointIdOfIgnoredInFlightData);
+ configuration
.getOptional(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT)
.ifPresent(this::setAlignmentTimeout);
configuration
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 7cdd194..40c5a6e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -196,4 +196,21 @@ public class ExecutionCheckpointingOptions {
.text(
"Forces unaligned checkpoints, particularly allowing them for iterative jobs.")
.build());
+
+ public static final ConfigOption<Long> CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+ ConfigOptions.key("execution.checkpointing.id-of-ignored-in-flight-data")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.")
+ .linebreak()
+ .linebreak()
+ .text(
+ "It is better to keep this value empty until "
+ + "there is explicit needs to restore from "
+ + "the specific checkpoint without in-flight data.")
+ .linebreak()
+ .build());
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 425350a..f89641d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1312,6 +1312,8 @@ public class StreamingJobGraphGenerator {
.setTolerableCheckpointFailureNumber(
cfg.getTolerableCheckpointFailureNumber())
.setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled())
+ .setCheckpointIdOfIgnoredInFlightData(
+ cfg.getCheckpointIdOfIgnoredInFlightData())
.setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis())
.build(),
serializedStateBackend,
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
index 4820eb0..97c8f01 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
@@ -297,6 +297,7 @@ public class JobMasterStopWithSavepointITCase extends AbstractTestBase {
true,
false,
false,
+ 0,
0),
null);
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 39ea862..6cb55a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -108,6 +108,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
true,
false,
false,
+ 0,
0),
null);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
new file mode 100644
index 0000000..a8bdf6c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.locks.LockSupport;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
+
+/** Test of ignoring in-flight data during recovery. */
+public class IgnoreInFlightDataITCase extends TestLogger {
+ @ClassRule
+ public static final MiniClusterWithClientResource CLUSTER =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(2)
+ .build());
+
+ private static Configuration getConfiguration() {
+ Configuration config = new Configuration();
+ config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("48m"));
+ return config;
+ }
+
+ @Test
+ public void testIgnoreInFlightDataDuringRecovery() throws Exception {
+ // given: Stream which will fail after first checkpoint.
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(10);
+ env.getCheckpointConfig().enableUnalignedCheckpoints();
+ env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
+ env.setRestartStrategy(fixedDelayRestart(2, 0));
+
+ env.addSource(new NumberSource())
+ .shuffle()
+ // map for having parallel execution.
+ .map(new SlowMap())
+ .addSink(new SumFailSink())
+ // one sink for easy calculation.
+ .setParallelism(1);
+
+ // when: Job is executed.
+ env.execute("Total sum");
+
+ // Calculate the expected single value after recovery.
+ int sourceValueAfterRestore = NumberSource.lastCheckpointedValue + 1;
+
+ // Calculate result in case of normal recovery.
+ long resultWithoutIgnoringData = 0;
+ for (int i = 0; i <= sourceValueAfterRestore; i++) {
+ resultWithoutIgnoringData += i;
+ }
+
+ // then: Actual result should be less than the ideal result because some of data was
+ // ignored.
+ assertThat(SumFailSink.result, lessThan(resultWithoutIgnoringData));
+
+ // and: Actual result should be equal to sum of result before fail + source value after
+ // recovery.
+ long expectedResult = SumFailSink.resultBeforeFail + sourceValueAfterRestore;
+ assertEquals(expectedResult, SumFailSink.result);
+ }
+
+ private static class SumFailSink implements SinkFunction<Integer>, CheckpointedFunction {
+ public static long result;
+ public static long resultBeforeFail;
+
+ @Override
+ public void invoke(Integer value) throws Exception {
+ result += value;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ resultBeforeFail = result;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ result = resultBeforeFail;
+ }
+ }
+
+ private static class NumberSource implements SourceFunction<Integer>, CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+ private ListState<Integer> valueState;
+ public static int lastCheckpointedValue;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ Iterator<Integer> stateIt = valueState.get().iterator();
+ boolean isRecovered = stateIt.hasNext();
+
+ if (isRecovered) {
+ Integer lastValue = stateIt.next();
+
+ // Checking that ListState is recovered correctly.
+ assertEquals(lastCheckpointedValue, lastValue.intValue());
+
+ // if it is started after recovery, just send one more value and finish.
+ ctx.collect(lastValue + 1);
+ } else {
+ int next = 0;
+ while (true) {
+ synchronized (ctx.getCheckpointLock()) {
+ next++;
+ valueState.update(singletonList(next));
+ ctx.collect(next);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ if (lastCheckpointedValue > 0) {
+ throw new RuntimeException("Error during snapshot");
+ }
+
+ lastCheckpointedValue = valueState.get().iterator().next();
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.valueState =
+ context.getOperatorStateStore()
+ .getListState(new ListStateDescriptor<>("state", Types.INT));
+ }
+ }
+
+ private static class SlowMap extends RichMapFunction<Integer, Integer> {
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ // slow down the map in order to have more intermediate data.
+ LockSupport.parkNanos(100000);
+ return value;
+ }
+ }
+}