You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/25 22:21:23 UTC
[flink] 02/02: fixup: Create the new operator states instead of
changing old one
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bbc94d750dbdc64aee7cb78de010a5943e14e637
Author: anton <ka...@yandex.ru>
AuthorDate: Fri May 21 13:34:57 2021 +0200
fixup: Create the new operator states instead of changing old one
---
.../execution_checkpointing_configuration.html | 12 ++---
.../runtime/checkpoint/CheckpointCoordinator.java | 51 +++++++++++++---------
.../api/environment/CheckpointConfig.java | 6 ++-
.../environment/ExecutionCheckpointingOptions.java | 2 +-
4 files changed, 42 insertions(+), 29 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
index 88dfc04..90b6e9bd 100644
--- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
@@ -21,12 +21,6 @@
<td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint should [...]
</tr>
<tr>
- <td><h5>execution.checkpointing.id-of-ignored-in-flight-data</h5></td>
- <td style="word-wrap: break-word;">-1</td>
- <td>Long</td>
- <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td>
- </tr>
- <tr>
<td><h5>execution.checkpointing.interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
@@ -57,6 +51,12 @@
<td>If enabled, a job recovery should fallback to checkpoint when there is a more recent savepoint.</td>
</tr>
<tr>
+ <td><h5>execution.checkpointing.recover-without-channel-state.checkpoint-id</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Long</td>
+ <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td>
+ </tr>
+ <tr>
<td><h5>execution.checkpointing.timeout</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c5c1e7e..90d94e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1600,30 +1600,39 @@ public class CheckpointCoordinator {
}
private Map<OperatorID, OperatorState> extractOperatorStates(CompletedCheckpoint checkpoint) {
- Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates();
-
- if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData) {
- // rewrite the operator state with empty in-flight data.
- for (OperatorState operatorState : operatorStates.values()) {
- for (Map.Entry<Integer, OperatorSubtaskState> subtaskStateEntry :
- operatorState.getSubtaskStates().entrySet()) {
-
- OperatorSubtaskState subtaskState = subtaskStateEntry.getValue();
- if (!subtaskState.getResultSubpartitionState().isEmpty()
- || !subtaskState.getInputChannelState().isEmpty()) {
- operatorState.putState(
- subtaskStateEntry.getKey(),
- subtaskState
- .toBuilder()
- .setResultSubpartitionState(StateObjectCollection.empty())
- .setInputChannelState(StateObjectCollection.empty())
- .build());
- }
- }
+ Map<OperatorID, OperatorState> originalOperatorStates = checkpoint.getOperatorStates();
+
+ if (checkpoint.getCheckpointID() != checkpointIdOfIgnoredInFlightData) {
+ // Don't do any changes if it is not required.
+ return originalOperatorStates;
+ }
+
+ HashMap<OperatorID, OperatorState> newStates = new HashMap<>();
+ // Create the new operator states without in-flight data.
+ for (OperatorState originalOperatorState : originalOperatorStates.values()) {
+ OperatorState newState =
+ new OperatorState(
+ originalOperatorState.getOperatorID(),
+ originalOperatorState.getParallelism(),
+ originalOperatorState.getMaxParallelism());
+
+ newStates.put(newState.getOperatorID(), newState);
+
+ for (Map.Entry<Integer, OperatorSubtaskState> originalSubtaskStateEntry :
+ originalOperatorState.getSubtaskStates().entrySet()) {
+
+ newState.putState(
+ originalSubtaskStateEntry.getKey(),
+ originalSubtaskStateEntry
+ .getValue()
+ .toBuilder()
+ .setResultSubpartitionState(StateObjectCollection.empty())
+ .setInputChannelState(StateObjectCollection.empty())
+ .build());
}
}
- return operatorStates;
+ return newStates;
}
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 02ee15b..a087a39 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -65,6 +65,9 @@ public class CheckpointConfig implements java.io.Serializable {
public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+ /** Default id of checkpoint for which in-flight data should be ignored on recovery. */
+ public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1;
+
// ------------------------------------------------------------------------
/** Checkpointing mode (exactly-once vs. at-least-once). */
@@ -92,7 +95,8 @@ public class CheckpointConfig implements java.io.Serializable {
private boolean unalignedCheckpointsEnabled;
/** Id of checkpoint for which in-flight data should be ignored on recovery. */
- private long checkpointIdOfIgnoredInFlightData;
+ private long checkpointIdOfIgnoredInFlightData =
+ DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
private Duration alignmentTimeout =
ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 40c5a6e..1c9d97a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -198,7 +198,7 @@ public class ExecutionCheckpointingOptions {
.build());
public static final ConfigOption<Long> CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
- ConfigOptions.key("execution.checkpointing.id-of-ignored-in-flight-data")
+ ConfigOptions.key("execution.checkpointing.recover-without-channel-state.checkpoint-id")
.longType()
.defaultValue(-1L)
.withDescription(