You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:35 UTC
[flink] 04/11: [FLINK-12963] [state-processor] Refactor operator
state access concerns into ModifiableSavepointMetadata
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 31e827e54bcf643dc0593227aa6e491061102ccd
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jul 3 13:05:37 2019 +0800
[FLINK-12963] [state-processor] Refactor operator state access concerns into ModifiableSavepointMetadata
---
.../apache/flink/state/api/WritableSavepoint.java | 39 +++++++++-------------
.../metadata/ModifiableSavepointMetadata.java | 29 ++++++++++++++--
2 files changed, 42 insertions(+), 26 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index 300d1ae..0f68cce 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -19,8 +19,10 @@ package org.apache.flink.state.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.output.MergeOperatorStates;
import org.apache.flink.state.api.output.SavepointOutputFormat;
@@ -28,7 +30,6 @@ import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
import org.apache.flink.util.Preconditions;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Any savepoint that can be written to from a batch context.
@@ -79,9 +80,10 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
public final void write(String path) {
final Path savepointPath = new Path(path);
- DataSet<OperatorState> newOperatorStates = getOperatorStates(savepointPath);
+ List<Tuple2<OperatorID, BootstrapTransformation<?>>> newOperatorTransformations = metadata.getNewOperatorTransformations();
+ DataSet<OperatorState> newOperatorStates = writeOperatorStates(newOperatorTransformations, savepointPath);
- List<OperatorState> existingOperators = getExistingOperatorStates();
+ List<OperatorState> existingOperators = metadata.getExistingOperators();
DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);
@@ -92,27 +94,6 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
.name(path);
}
- private List<OperatorState> getExistingOperatorStates() {
- return metadata
- .getOperatorStates()
- .entrySet()
- .stream()
- .filter(entry -> entry.getValue().isLeft())
- .map(entry -> entry.getValue().left())
- .collect(Collectors.toList());
- }
-
- private DataSet<OperatorState> getOperatorStates(Path savepointPath) {
- return metadata
- .getOperatorStates()
- .entrySet()
- .stream()
- .filter(entry -> entry.getValue().isRight())
- .map(entry -> entry.getValue().right().writeOperatorState(entry.getKey(), stateBackend, metadata, savepointPath))
- .reduce(DataSet::union)
- .orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
- }
-
private DataSet<OperatorState> unionOperatorStates(DataSet<OperatorState> newOperatorStates, List<OperatorState> existingOperators) {
DataSet<OperatorState> finalOperatorStates;
if (existingOperators.isEmpty()) {
@@ -126,4 +107,14 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
}
return finalOperatorStates;
}
+
+ private DataSet<OperatorState> writeOperatorStates(
+ List<Tuple2<OperatorID, BootstrapTransformation<?>>> newOperatorTransformations,
+ Path savepointWritePath) {
+ return newOperatorTransformations
+ .stream()
+ .map(transformation -> transformation.f1.writeOperatorState(transformation.f0, stateBackend, metadata, savepointWritePath))
+ .reduce(DataSet::union)
+ .orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
+ }
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
index 158fa35..8df8723 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
@@ -1,6 +1,7 @@
package org.apache.flink.state.api.runtime.metadata;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -11,7 +12,10 @@ import org.apache.flink.types.Either;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Savepoint metadata that can be modified.
@@ -61,7 +65,28 @@ public class ModifiableSavepointMetadata extends SavepointMetadata {
operatorStateIndex.put(id, Either.Right(transformation));
}
- public Map<OperatorID, Either<OperatorState, BootstrapTransformation<?>>> getOperatorStates() {
- return operatorStateIndex;
+ /**
+ * @return List of {@link OperatorState} that already exists within the savepoint.
+ */
+ public List<OperatorState> getExistingOperators() {
+ return operatorStateIndex
+ .values()
+ .stream()
+ .filter(Either::isLeft)
+ .map(Either::left)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @return List of new operator states for the savepoint, represented by their target {@link OperatorID} and {@link BootstrapTransformation}.
+ */
+ public List<Tuple2<OperatorID, BootstrapTransformation<?>>> getNewOperatorTransformations() {
+ Stream<Tuple2<OperatorID, BootstrapTransformation<?>>> transformations = operatorStateIndex
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getValue().isRight())
+ .map(entry -> Tuple2.of(entry.getKey(), entry.getValue().right()));
+
+ return transformations.collect(Collectors.toList());
}
}