You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/25 22:21:22 UTC

[flink] 01/02: [FLINK-22684][runtime] Added ability to ignore in-flight data during the recovery

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 11f59bf60a0bab9f921cce9b43f75e57fb099a47
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed May 19 17:32:09 2021 +0200

    [FLINK-22684][runtime] Added ability to ignore in-flight data during the recovery
---
 .../execution_checkpointing_configuration.html     |   6 +
 .../jobmanager/JMXJobManagerMetricTest.java        |   1 +
 .../runtime/checkpoint/CheckpointCoordinator.java  |  33 +++-
 .../runtime/checkpoint/OperatorSubtaskState.java   |  12 ++
 .../tasks/CheckpointCoordinatorConfiguration.java  |  34 +++-
 .../CheckpointCoordinatorMasterHooksTest.java      |   1 +
 .../CheckpointCoordinatorRestoringTest.java        | 140 +++++++++++++--
 .../CheckpointSettingsSerializableTest.java        |   1 +
 .../checkpoint/CheckpointStatsTrackerTest.java     |   1 +
 .../ExecutionGraphCheckpointCoordinatorTest.java   |   1 +
 .../FailoverStrategyCheckpointCoordinatorTest.java |   1 +
 .../checkpoint/OperatorSubtaskStateTest.java       |  54 ++++++
 .../executiongraph/ArchivedExecutionGraphTest.java |   1 +
 .../DefaultExecutionGraphDeploymentTest.java       |   1 +
 .../flink/runtime/jobgraph/JobGraphTest.java       |   1 +
 .../tasks/JobCheckpointingSettingsTest.java        |   1 +
 .../runtime/scheduler/SchedulerTestingUtils.java   |   1 +
 .../api/environment/CheckpointConfig.java          |  30 ++++
 .../environment/ExecutionCheckpointingOptions.java |  17 ++
 .../api/graph/StreamingJobGraphGenerator.java      |   2 +
 .../JobMasterStopWithSavepointITCase.java          |   1 +
 .../jobmaster/JobMasterTriggerSavepointITCase.java |   1 +
 .../checkpointing/IgnoreInFlightDataITCase.java    | 189 +++++++++++++++++++++
 23 files changed, 508 insertions(+), 22 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
index c9786ab..88dfc04 100644
--- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
@@ -21,6 +21,12 @@
             <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint should [...]
         </tr>
         <tr>
+            <td><h5>execution.checkpointing.id-of-ignored-in-flight-data</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>Long</td>
+            <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td>
+        </tr>
+        <tr>
             <td><h5>execution.checkpointing.interval</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Duration</td>
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 4ae173a..4044bf5 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -101,6 +101,7 @@ public class JMXJobManagerMetricTest extends TestLogger {
                                     true,
                                     false,
                                     false,
+                                    0,
                                     0),
                             null);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b31dd8f..c5c1e7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -206,6 +206,9 @@ public class CheckpointCoordinator {
 
     private boolean isPreferCheckpointForRecovery;
 
+    /** Id of checkpoint for which in-flight data should be ignored on recovery. */
+    private final long checkpointIdOfIgnoredInFlightData;
+
     private final CheckpointFailureManager failureManager;
 
     private final Clock clock;
@@ -309,6 +312,7 @@ public class CheckpointCoordinator {
         this.isExactlyOnceMode = chkConfig.isExactlyOnce();
         this.unalignedCheckpointsEnabled = chkConfig.isUnalignedCheckpointsEnabled();
         this.alignmentTimeout = chkConfig.getAlignmentTimeout();
+        this.checkpointIdOfIgnoredInFlightData = chkConfig.getCheckpointIdOfIgnoredInFlightData();
 
         this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
         this.masterHooks = new HashMap<>();
@@ -1553,7 +1557,7 @@ public class CheckpointCoordinator {
             LOG.info("Restoring job {} from {}.", job, latest);
 
             // re-assign the task states
-            final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
+            final Map<OperatorID, OperatorState> operatorStates = extractOperatorStates(latest);
 
             StateAssignmentOperation stateAssignmentOperation =
                     new StateAssignmentOperation(
@@ -1595,6 +1599,33 @@ public class CheckpointCoordinator {
         }
     }
 
+    private Map<OperatorID, OperatorState> extractOperatorStates(CompletedCheckpoint checkpoint) {
+        Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates();
+
+        if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData) {
+            // rewrite the operator state with empty in-flight data.
+            for (OperatorState operatorState : operatorStates.values()) {
+                for (Map.Entry<Integer, OperatorSubtaskState> subtaskStateEntry :
+                        operatorState.getSubtaskStates().entrySet()) {
+
+                    OperatorSubtaskState subtaskState = subtaskStateEntry.getValue();
+                    if (!subtaskState.getResultSubpartitionState().isEmpty()
+                            || !subtaskState.getInputChannelState().isEmpty()) {
+                        operatorState.putState(
+                                subtaskStateEntry.getKey(),
+                                subtaskState
+                                        .toBuilder()
+                                        .setResultSubpartitionState(StateObjectCollection.empty())
+                                        .setInputChannelState(StateObjectCollection.empty())
+                                        .build());
+                    }
+                }
+            }
+        }
+
+        return operatorStates;
+    }
+
     /**
      * Restore the state with given savepoint.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index b6a9f64..9d0739a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -307,6 +307,18 @@ public class OperatorSubtaskState implements CompositeStateHandle {
                 || resultSubpartitionState.hasState();
     }
 
+    public Builder toBuilder() {
+        return builder()
+                .setManagedKeyedState(managedKeyedState)
+                .setManagedOperatorState(managedOperatorState)
+                .setRawOperatorState(rawOperatorState)
+                .setRawKeyedState(rawKeyedState)
+                .setInputChannelState(inputChannelState)
+                .setResultSubpartitionState(resultSubpartitionState)
+                .setInputRescalingDescriptor(inputRescalingDescriptor)
+                .setOutputRescalingDescriptor(outputRescalingDescriptor);
+    }
+
     public static Builder builder() {
         return new Builder();
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index f679072..0fd6f87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -64,6 +64,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 
     private final long alignmentTimeout;
 
+    private final long checkpointIdOfIgnoredInFlightData;
+
     /** @deprecated use {@link #builder()}. */
     @Deprecated
     @VisibleForTesting
@@ -76,7 +78,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
             boolean isExactlyOnce,
             boolean isUnalignedCheckpoint,
             boolean isPreferCheckpointForRecovery,
-            int tolerableCpFailureNumber) {
+            int tolerableCpFailureNumber,
+            long checkpointIdOfIgnoredInFlightData) {
         this(
                 checkpointInterval,
                 checkpointTimeout,
@@ -87,7 +90,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
                 isPreferCheckpointForRecovery,
                 tolerableCpFailureNumber,
                 isUnalignedCheckpoint,
-                0);
+                0,
+                checkpointIdOfIgnoredInFlightData);
     }
 
     private CheckpointCoordinatorConfiguration(
@@ -100,7 +104,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
             boolean isPreferCheckpointForRecovery,
             int tolerableCpFailureNumber,
             boolean isUnalignedCheckpointsEnabled,
-            long alignmentTimeout) {
+            long alignmentTimeout,
+            long checkpointIdOfIgnoredInFlightData) {
 
         // sanity checks
         if (checkpointInterval < MINIMAL_CHECKPOINT_TIME
@@ -124,6 +129,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
         this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber;
         this.isUnalignedCheckpointsEnabled = isUnalignedCheckpointsEnabled;
         this.alignmentTimeout = alignmentTimeout;
+        this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData;
     }
 
     public long getCheckpointInterval() {
@@ -166,6 +172,10 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
         return alignmentTimeout;
     }
 
+    public long getCheckpointIdOfIgnoredInFlightData() {
+        return checkpointIdOfIgnoredInFlightData;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -183,7 +193,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
                 && isUnalignedCheckpointsEnabled == that.isUnalignedCheckpointsEnabled
                 && checkpointRetentionPolicy == that.checkpointRetentionPolicy
                 && isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery
-                && tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber;
+                && tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber
+                && checkpointIdOfIgnoredInFlightData == that.checkpointIdOfIgnoredInFlightData;
     }
 
     @Override
@@ -197,7 +208,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
                 isExactlyOnce,
                 isUnalignedCheckpointsEnabled,
                 isPreferCheckpointForRecovery,
-                tolerableCheckpointFailureNumber);
+                tolerableCheckpointFailureNumber,
+                checkpointIdOfIgnoredInFlightData);
     }
 
     @Override
@@ -221,6 +233,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
                 + isPreferCheckpointForRecovery
                 + ", tolerableCheckpointFailureNumber="
                 + tolerableCheckpointFailureNumber
+                + ", checkpointIdOfIgnoredInFlightData="
+                + checkpointIdOfIgnoredInFlightData
                 + '}';
     }
 
@@ -241,6 +255,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
         private int tolerableCheckpointFailureNumber;
         private boolean isUnalignedCheckpointsEnabled;
         private long alignmentTimeout = 0;
+        private long checkpointIdOfIgnoredInFlightData;
 
         public CheckpointCoordinatorConfiguration build() {
             return new CheckpointCoordinatorConfiguration(
@@ -253,7 +268,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
                     isPreferCheckpointForRecovery,
                     tolerableCheckpointFailureNumber,
                     isUnalignedCheckpointsEnabled,
-                    alignmentTimeout);
+                    alignmentTimeout,
+                    checkpointIdOfIgnoredInFlightData);
         }
 
         public CheckpointCoordinatorConfigurationBuilder setCheckpointInterval(
@@ -314,5 +330,11 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
             this.alignmentTimeout = alignmentTimeout;
             return this;
         }
+
+        public CheckpointCoordinatorConfigurationBuilder setCheckpointIdOfIgnoredInFlightData(
+                long checkpointIdOfIgnoredInFlightData) {
+            this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData;
+            return this;
+        }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 0dd7104..86337f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -474,6 +474,7 @@ public class CheckpointCoordinatorMasterHooksTest {
                         true,
                         false,
                         false,
+                        0,
                         0);
         Executor executor = Executors.directExecutor();
         return new CheckpointCoordinator(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 4bc02b6..6559166 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -59,11 +59,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.Collections.singletonList;
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.compareKeyedState;
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.comparePartitionableState;
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle;
@@ -71,6 +73,8 @@ import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUt
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle;
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockSubtaskState;
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.verifyStateRestore;
+import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
+import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -283,8 +287,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
             long checkpointId = checkpointIDCounter.getLast();
 
             KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 0);
-            List<SerializableObject> testStates =
-                    Collections.singletonList(new SerializableObject());
+            List<SerializableObject> testStates = singletonList(new SerializableObject());
             KeyedStateHandle serializedKeyGroupStates =
                     generateKeyGroupState(keyGroupRange, testStates);
 
@@ -319,7 +322,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
             KeyGroupRange keyGroupRangeForSavepoint = KeyGroupRange.of(1, 1);
             List<SerializableObject> testStatesForSavepoint =
-                    Collections.singletonList(new SerializableObject());
+                    singletonList(new SerializableObject());
             KeyedStateHandle serializedKeyGroupStatesForSavepoint =
                     generateKeyGroupState(keyGroupRangeForSavepoint, testStatesForSavepoint);
 
@@ -441,6 +444,9 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                             .setManagedOperatorState(opStateBackend)
                             .setManagedKeyedState(keyedStateBackend)
                             .setRawKeyedState(keyedStateRaw)
+                            .setInputChannelState(
+                                    StateObjectCollection.singleton(
+                                            createNewInputChannelStateHandle(3, new Random())))
                             .build();
             TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
             taskOperatorSubtaskStates.putSubtaskStateByOperatorID(
@@ -474,10 +480,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                     generatePartitionableStateHandle(jobVertexID2, index, 2, 8, false);
             OperatorStateHandle opStateRaw =
                     generatePartitionableStateHandle(jobVertexID2, index, 2, 8, true);
-            expectedOpStatesBackend.add(
-                    new ChainedStateHandle<>(Collections.singletonList(opStateBackend)));
-            expectedOpStatesRaw.add(
-                    new ChainedStateHandle<>(Collections.singletonList(opStateRaw)));
+            expectedOpStatesBackend.add(new ChainedStateHandle<>(singletonList(opStateBackend)));
+            expectedOpStatesRaw.add(new ChainedStateHandle<>(singletonList(opStateRaw)));
 
             OperatorSubtaskState operatorSubtaskState =
                     OperatorSubtaskState.builder()
@@ -574,11 +578,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                 if (idx == headOpIndex) {
                     Collection<KeyedStateHandle> keyedStateBackend = opState.getManagedKeyedState();
                     Collection<KeyedStateHandle> keyGroupStateRaw = opState.getRawKeyedState();
-                    compareKeyedState(
-                            Collections.singletonList(originalKeyedStateBackend),
-                            keyedStateBackend);
-                    compareKeyedState(
-                            Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
+                    compareKeyedState(singletonList(originalKeyedStateBackend), keyedStateBackend);
+                    compareKeyedState(singletonList(originalKeyedStateRaw), keyGroupStateRaw);
                 }
             }
             actualOpStatesBackend.add(allParallelManagedOpStates);
@@ -1032,13 +1033,122 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
             Collection<KeyedStateHandle> keyedStateBackend = headOpState.getManagedKeyedState();
             Collection<KeyedStateHandle> keyGroupStateRaw = headOpState.getRawKeyedState();
 
-            compareKeyedState(
-                    Collections.singletonList(originalKeyedStateBackend), keyedStateBackend);
-            compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
+            compareKeyedState(singletonList(originalKeyedStateBackend), keyedStateBackend);
+            compareKeyedState(singletonList(originalKeyedStateRaw), keyGroupStateRaw);
         }
 
         comparePartitionableState(
                 expectedManagedOperatorStates.get(0), actualManagedOperatorStates);
         comparePartitionableState(expectedRawOperatorStates.get(0), actualRawOperatorStates);
     }
+
+    @Test
+    public void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
+        // given: Operator with not empty states.
+        final JobVertexID jobVertexID = new JobVertexID();
+        int parallelism1 = 3;
+        int maxParallelism1 = 42;
+
+        CompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
+
+        final ExecutionGraph graph =
+                new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID, parallelism1, maxParallelism1)
+                        .build();
+
+        final ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
+
+        // set up the coordinator and validate the initial state
+        CheckpointCoordinator coord =
+                new CheckpointCoordinatorBuilder()
+                        .setExecutionGraph(graph)
+                        .setCompletedCheckpointStore(completedCheckpointStore)
+                        .setCheckpointCoordinatorConfiguration(
+                                new CheckpointCoordinatorConfigurationBuilder()
+                                        .setCheckpointIdOfIgnoredInFlightData(1)
+                                        .build())
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build();
+
+        // trigger the checkpoint
+        coord.triggerCheckpoint(false);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        assertEquals(1, coord.getPendingCheckpoints().size());
+        long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+
+        List<KeyGroupRange> keyGroupPartitions1 =
+                StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+
+        Random random = new Random();
+        // fill the states and complete the checkpoint.
+        for (int index = 0; index < jobVertex.getParallelism(); index++) {
+            OperatorSubtaskState operatorSubtaskState =
+                    OperatorSubtaskState.builder()
+                            .setManagedOperatorState(
+                                    generatePartitionableStateHandle(
+                                            jobVertexID, index, 2, 8, false))
+                            .setRawOperatorState(
+                                    generatePartitionableStateHandle(
+                                            jobVertexID, index, 2, 8, true))
+                            .setManagedKeyedState(
+                                    generateKeyGroupState(
+                                            jobVertexID, keyGroupPartitions1.get(index), false))
+                            .setRawKeyedState(
+                                    generateKeyGroupState(
+                                            jobVertexID, keyGroupPartitions1.get(index), true))
+                            .setInputChannelState(
+                                    StateObjectCollection.singleton(
+                                            createNewInputChannelStateHandle(3, random)))
+                            .setResultSubpartitionState(
+                                    StateObjectCollection.singleton(
+                                            createNewResultSubpartitionStateHandle(3, random)))
+                            .build();
+            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
+            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(
+                    OperatorID.fromJobVertexID(jobVertexID), operatorSubtaskState);
+
+            AcknowledgeCheckpoint acknowledgeCheckpoint =
+                    new AcknowledgeCheckpoint(
+                            graph.getJobID(),
+                            jobVertex
+                                    .getTaskVertices()[index]
+                                    .getCurrentExecutionAttempt()
+                                    .getAttemptId(),
+                            checkpointId,
+                            new CheckpointMetrics(),
+                            taskOperatorSubtaskStates);
+
+            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
+        }
+
+        assertEquals(1, coord.getSuccessfulCheckpoints().size());
+
+        // when: Restore latest checkpoint without in-flight data.
+        Set<ExecutionJobVertex> tasks = new HashSet<>();
+        tasks.add(jobVertex);
+        assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+
+        // then: All states should be restored successfully except InputChannel and
+        // ResultSubpartition which should be ignored.
+        verifyStateRestore(jobVertexID, jobVertex, keyGroupPartitions1);
+        for (int i = 0; i < jobVertex.getParallelism(); i++) {
+            JobManagerTaskRestore taskRestore =
+                    jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
+            Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
+            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
+
+            OperatorSubtaskState operatorState =
+                    stateSnapshot.getSubtaskStateByOperatorID(
+                            OperatorID.fromJobVertexID(jobVertexID));
+
+            assertTrue(operatorState.getInputChannelState().isEmpty());
+            assertTrue(operatorState.getResultSubpartitionState().isEmpty());
+
+            assertFalse(operatorState.getRawOperatorState().isEmpty());
+            assertFalse(operatorState.getManagedOperatorState().isEmpty());
+            assertFalse(operatorState.getRawKeyedState().isEmpty());
+            assertFalse(operatorState.getManagedOperatorState().isEmpty());
+        }
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index d0de6d1..9950f2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -86,6 +86,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
                                 true,
                                 false,
                                 false,
+                                0,
                                 0),
                         new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
                         new SerializedValue<CheckpointStorage>(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 3c134cb..927b809 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -60,6 +60,7 @@ public class CheckpointStatsTrackerTest {
                                 false,
                                 false,
                                 false,
+                                0,
                                 0),
                         null);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index d53be67..50c7dd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -145,6 +145,7 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
                         true,
                         false,
                         false,
+                        0,
                         0);
         final JobCheckpointingSettings checkpointingSettings =
                 new JobCheckpointingSettings(chkConfig, null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
index 9396ef0..ee71761 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
@@ -70,6 +70,7 @@ public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger {
                         true,
                         false,
                         false,
+                        0,
                         0);
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinator(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
index 274e7fd..a764c76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
@@ -19,16 +19,28 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Random;
+
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.builder.EqualsBuilder.reflectionEquals;
+import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState;
+import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle;
+import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
+import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /** {@link OperatorSubtaskState} test. */
 public class OperatorSubtaskStateTest {
@@ -50,6 +62,48 @@ public class OperatorSubtaskStateTest {
                 .discardState();
     }
 
+    @Test
+    public void testToBuilderCorrectness() throws IOException {
+        // given: Initialized operator subtask state.
+        JobVertexID jobVertexID = new JobVertexID();
+        int index = 0;
+        Random random = new Random();
+
+        OperatorSubtaskState operatorSubtaskState =
+                OperatorSubtaskState.builder()
+                        .setManagedOperatorState(
+                                generatePartitionableStateHandle(jobVertexID, index, 2, 8, false))
+                        .setRawOperatorState(
+                                generatePartitionableStateHandle(jobVertexID, index, 2, 8, true))
+                        .setManagedKeyedState(
+                                generateKeyGroupState(jobVertexID, new KeyGroupRange(0, 11), false))
+                        .setRawKeyedState(
+                                generateKeyGroupState(jobVertexID, new KeyGroupRange(0, 9), true))
+                        .setInputChannelState(
+                                StateObjectCollection.singleton(
+                                        createNewInputChannelStateHandle(3, random)))
+                        .setResultSubpartitionState(
+                                StateObjectCollection.singleton(
+                                        createNewResultSubpartitionStateHandle(3, random)))
+                        .setInputRescalingDescriptor(
+                                new InflightDataRescalingDescriptor(
+                                        new int[1],
+                                        new RescaleMappings[0],
+                                        Collections.singleton(1)))
+                        .setOutputRescalingDescriptor(
+                                new InflightDataRescalingDescriptor(
+                                        new int[1],
+                                        new RescaleMappings[0],
+                                        Collections.singleton(2)))
+                        .build();
+
+        // when: Copy the operator subtask state.
+        OperatorSubtaskState operatorSubtaskStateCopy = operatorSubtaskState.toBuilder().build();
+
+        // then: It should be equal to original one.
+        assertTrue(reflectionEquals(operatorSubtaskState, operatorSubtaskStateCopy));
+    }
+
     private ResultSubpartitionStateHandle buildSubpartitionHandle(
             StreamStateHandle delegate, int subPartitionIdx1) {
         return new ResultSubpartitionStateHandle(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 3292579..073d214 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -101,6 +101,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
                         true,
                         false,
                         false,
+                        0,
                         0);
         JobCheckpointingSettings checkpointingSettings =
                 new JobCheckpointingSettings(chkConfig, null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index ec722d8..3bbd94f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -659,6 +659,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
                                 false,
                                 false,
                                 false,
+                                0,
                                 0),
                         null));
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index d779c99..dd4424b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -398,6 +398,7 @@ public class JobGraphTest extends TestLogger {
                         true,
                         false,
                         false,
+                        0,
                         0);
 
         return new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 3677785..8a7a738 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -45,6 +45,7 @@ public class JobCheckpointingSettingsTest {
                                 false,
                                 false,
                                 false,
+                                0,
                                 0),
                         new SerializedValue<>(new MemoryStateBackend()));
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index ac5b1cd..80e8652c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -162,6 +162,7 @@ public class SchedulerTestingUtils {
                         false,
                         false,
                         false,
+                        0,
                         0);
 
         SerializedValue<StateBackend> serializedStateBackend = null;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 76abc50..02ee15b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -91,6 +91,9 @@ public class CheckpointConfig implements java.io.Serializable {
     /** Flag to enable unaligned checkpoints. */
     private boolean unalignedCheckpointsEnabled;
 
+    /** Id of checkpoint for which in-flight data should be ignored on recovery. */
+    private long checkpointIdOfIgnoredInFlightData;
+
     private Duration alignmentTimeout =
             ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue();
 
@@ -148,6 +151,8 @@ public class CheckpointConfig implements java.io.Serializable {
         this.forceCheckpointing = checkpointConfig.forceCheckpointing;
         this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
         this.storage = checkpointConfig.getCheckpointStorage();
+        this.checkpointIdOfIgnoredInFlightData =
+                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
     public CheckpointConfig() {}
@@ -668,6 +673,28 @@ public class CheckpointConfig implements java.io.Serializable {
         return this.storage;
     }
 
+    /**
+     * Setup the checkpoint id for which the in-flight data will be ignored for all operators in
+     * case of the recovery from this checkpoint.
+     *
+     * @param checkpointIdOfIgnoredInFlightData Checkpoint id for which in-flight data should be
+     *     ignored.
+     * @see #setCheckpointIdOfIgnoredInFlightData
+     */
+    @PublicEvolving
+    public void setCheckpointIdOfIgnoredInFlightData(long checkpointIdOfIgnoredInFlightData) {
+        this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData;
+    }
+
+    /**
+     * @return Checkpoint id for which in-flight data should be ignored.
+     * @see #setCheckpointIdOfIgnoredInFlightData
+     */
+    @PublicEvolving
+    public long getCheckpointIdOfIgnoredInFlightData() {
+        return checkpointIdOfIgnoredInFlightData;
+    }
+
     /** Cleanup behaviour for externalized checkpoints when the job is cancelled. */
     @PublicEvolving
     public enum ExternalizedCheckpointCleanup {
@@ -750,6 +777,9 @@ public class CheckpointConfig implements java.io.Serializable {
                 .getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED)
                 .ifPresent(this::enableUnalignedCheckpoints);
         configuration
+                .getOptional(ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA)
+                .ifPresent(this::setCheckpointIdOfIgnoredInFlightData);
+        configuration
                 .getOptional(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT)
                 .ifPresent(this::setAlignmentTimeout);
         configuration
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 7cdd194..40c5a6e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -196,4 +196,21 @@ public class ExecutionCheckpointingOptions {
                                     .text(
                                             "Forces unaligned checkpoints, particularly allowing them for iterative jobs.")
                                     .build());
+
+    public static final ConfigOption<Long> CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            ConfigOptions.key("execution.checkpointing.id-of-ignored-in-flight-data")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text(
+                                            "It is better to keep this value empty until "
+                                                    + "there is explicit needs to restore from "
+                                                    + "the specific checkpoint without in-flight data.")
+                                    .linebreak()
+                                    .build());
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 425350a..f89641d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1312,6 +1312,8 @@ public class StreamingJobGraphGenerator {
                                 .setTolerableCheckpointFailureNumber(
                                         cfg.getTolerableCheckpointFailureNumber())
                                 .setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled())
+                                .setCheckpointIdOfIgnoredInFlightData(
+                                        cfg.getCheckpointIdOfIgnoredInFlightData())
                                 .setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis())
                                 .build(),
                         serializedStateBackend,
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
index 4820eb0..97c8f01 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
@@ -297,6 +297,7 @@ public class JobMasterStopWithSavepointITCase extends AbstractTestBase {
                                 true,
                                 false,
                                 false,
+                                0,
                                 0),
                         null);
 
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 39ea862..6cb55a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -108,6 +108,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
                                 true,
                                 false,
                                 false,
+                                0,
                                 0),
                         null);
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
new file mode 100644
index 0000000..a8bdf6c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.locks.LockSupport;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
+
+/** Test of ignoring in-flight data during recovery. */
+public class IgnoreInFlightDataITCase extends TestLogger {
+    @ClassRule
+    public static final MiniClusterWithClientResource CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getConfiguration())
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(2)
+                            .build());
+
+    private static Configuration getConfiguration() {
+        Configuration config = new Configuration();
+        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("48m"));
+        return config;
+    }
+
+    @Test
+    public void testIgnoreInFlightDataDuringRecovery() throws Exception {
+        // given: Stream which will fail after first checkpoint.
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(3);
+        env.enableCheckpointing(10);
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
+        env.setRestartStrategy(fixedDelayRestart(2, 0));
+
+        env.addSource(new NumberSource())
+                .shuffle()
+                // map for having parallel execution.
+                .map(new SlowMap())
+                .addSink(new SumFailSink())
+                // one sink for easy calculation.
+                .setParallelism(1);
+
+        // when: Job is executed.
+        env.execute("Total sum");
+
+        // Calculate the expected single value after recovery.
+        int sourceValueAfterRestore = NumberSource.lastCheckpointedValue + 1;
+
+        // Calculate result in case of normal recovery.
+        long resultWithoutIgnoringData = 0;
+        for (int i = 0; i <= sourceValueAfterRestore; i++) {
+            resultWithoutIgnoringData += i;
+        }
+
+        // then: Actual result should be less than the ideal result because some of data was
+        // ignored.
+        assertThat(SumFailSink.result, lessThan(resultWithoutIgnoringData));
+
+        // and: Actual result should be equal to sum of result before fail + source value after
+        // recovery.
+        long expectedResult = SumFailSink.resultBeforeFail + sourceValueAfterRestore;
+        assertEquals(expectedResult, SumFailSink.result);
+    }
+
+    private static class SumFailSink implements SinkFunction<Integer>, CheckpointedFunction {
+        public static long result;
+        public static long resultBeforeFail;
+
+        @Override
+        public void invoke(Integer value) throws Exception {
+            result += value;
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            resultBeforeFail = result;
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            result = resultBeforeFail;
+        }
+    }
+
+    private static class NumberSource implements SourceFunction<Integer>, CheckpointedFunction {
+
+        private static final long serialVersionUID = 1L;
+        private ListState<Integer> valueState;
+        public static int lastCheckpointedValue;
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            Iterator<Integer> stateIt = valueState.get().iterator();
+            boolean isRecovered = stateIt.hasNext();
+
+            if (isRecovered) {
+                Integer lastValue = stateIt.next();
+
+                // Checking that ListState is recovered correctly.
+                assertEquals(lastCheckpointedValue, lastValue.intValue());
+
+                // if it is started after recovery, just send one more value and finish.
+                ctx.collect(lastValue + 1);
+            } else {
+                int next = 0;
+                while (true) {
+                    synchronized (ctx.getCheckpointLock()) {
+                        next++;
+                        valueState.update(singletonList(next));
+                        ctx.collect(next);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {}
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            if (lastCheckpointedValue > 0) {
+                throw new RuntimeException("Error during snapshot");
+            }
+
+            lastCheckpointedValue = valueState.get().iterator().next();
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            this.valueState =
+                    context.getOperatorStateStore()
+                            .getListState(new ListStateDescriptor<>("state", Types.INT));
+        }
+    }
+
+    private static class SlowMap extends RichMapFunction<Integer, Integer> {
+
+        @Override
+        public Integer map(Integer value) throws Exception {
+            // slow down the map in order to have more intermediate data.
+            LockSupport.parkNanos(100000);
+            return value;
+        }
+    }
+}