You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/25 22:21:23 UTC

[flink] 02/02: fixup: Create the new operator states instead of changing old one

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bbc94d750dbdc64aee7cb78de010a5943e14e637
Author: anton <ka...@yandex.ru>
AuthorDate: Fri May 21 13:34:57 2021 +0200

    fixup: Create the new operator states instead of changing old one
---
 .../execution_checkpointing_configuration.html     | 12 ++---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 51 +++++++++++++---------
 .../api/environment/CheckpointConfig.java          |  6 ++-
 .../environment/ExecutionCheckpointingOptions.java |  2 +-
 4 files changed, 42 insertions(+), 29 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
index 88dfc04..90b6e9bd 100644
--- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
@@ -21,12 +21,6 @@
             <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint should [...]
         </tr>
         <tr>
-            <td><h5>execution.checkpointing.id-of-ignored-in-flight-data</h5></td>
-            <td style="word-wrap: break-word;">-1</td>
-            <td>Long</td>
-            <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td>
-        </tr>
-        <tr>
             <td><h5>execution.checkpointing.interval</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Duration</td>
@@ -57,6 +51,12 @@
             <td>If enabled, a job recovery should fallback to checkpoint when there is a more recent savepoint.</td>
         </tr>
         <tr>
+            <td><h5>execution.checkpointing.recover-without-channel-state.checkpoint-id</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>Long</td>
+            <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td>
+        </tr>
+        <tr>
             <td><h5>execution.checkpointing.timeout</h5></td>
             <td style="word-wrap: break-word;">10 min</td>
             <td>Duration</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c5c1e7e..90d94e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1600,30 +1600,39 @@ public class CheckpointCoordinator {
     }
 
     private Map<OperatorID, OperatorState> extractOperatorStates(CompletedCheckpoint checkpoint) {
-        Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates();
-
-        if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData) {
-            // rewrite the operator state with empty in-flight data.
-            for (OperatorState operatorState : operatorStates.values()) {
-                for (Map.Entry<Integer, OperatorSubtaskState> subtaskStateEntry :
-                        operatorState.getSubtaskStates().entrySet()) {
-
-                    OperatorSubtaskState subtaskState = subtaskStateEntry.getValue();
-                    if (!subtaskState.getResultSubpartitionState().isEmpty()
-                            || !subtaskState.getInputChannelState().isEmpty()) {
-                        operatorState.putState(
-                                subtaskStateEntry.getKey(),
-                                subtaskState
-                                        .toBuilder()
-                                        .setResultSubpartitionState(StateObjectCollection.empty())
-                                        .setInputChannelState(StateObjectCollection.empty())
-                                        .build());
-                    }
-                }
+        Map<OperatorID, OperatorState> originalOperatorStates = checkpoint.getOperatorStates();
+
+        if (checkpoint.getCheckpointID() != checkpointIdOfIgnoredInFlightData) {
+            // Don't do any changes if it is not required.
+            return originalOperatorStates;
+        }
+
+        HashMap<OperatorID, OperatorState> newStates = new HashMap<>();
+        // Create the new operator states without in-flight data.
+        for (OperatorState originalOperatorState : originalOperatorStates.values()) {
+            OperatorState newState =
+                    new OperatorState(
+                            originalOperatorState.getOperatorID(),
+                            originalOperatorState.getParallelism(),
+                            originalOperatorState.getMaxParallelism());
+
+            newStates.put(newState.getOperatorID(), newState);
+
+            for (Map.Entry<Integer, OperatorSubtaskState> originalSubtaskStateEntry :
+                    originalOperatorState.getSubtaskStates().entrySet()) {
+
+                newState.putState(
+                        originalSubtaskStateEntry.getKey(),
+                        originalSubtaskStateEntry
+                                .getValue()
+                                .toBuilder()
+                                .setResultSubpartitionState(StateObjectCollection.empty())
+                                .setInputChannelState(StateObjectCollection.empty())
+                                .build());
             }
         }
 
-        return operatorStates;
+        return newStates;
     }
 
     /**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 02ee15b..a087a39 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -65,6 +65,9 @@ public class CheckpointConfig implements java.io.Serializable {
 
     public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
 
+    /** Default id of checkpoint for which in-flight data should be ignored on recovery. */
+    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1;
+
     // ------------------------------------------------------------------------
 
     /** Checkpointing mode (exactly-once vs. at-least-once). */
@@ -92,7 +95,8 @@ public class CheckpointConfig implements java.io.Serializable {
     private boolean unalignedCheckpointsEnabled;
 
     /** Id of checkpoint for which in-flight data should be ignored on recovery. */
-    private long checkpointIdOfIgnoredInFlightData;
+    private long checkpointIdOfIgnoredInFlightData =
+            DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
 
     private Duration alignmentTimeout =
             ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 40c5a6e..1c9d97a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -198,7 +198,7 @@ public class ExecutionCheckpointingOptions {
                                     .build());
 
     public static final ConfigOption<Long> CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
-            ConfigOptions.key("execution.checkpointing.id-of-ignored-in-flight-data")
+            ConfigOptions.key("execution.checkpointing.recover-without-channel-state.checkpoint-id")
                     .longType()
                     .defaultValue(-1L)
                     .withDescription(