You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:35 UTC

[flink] 04/11: [FLINK-12963] [state-processor] Refactor operator state access concerns into ModifiableSavepointMetadata

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31e827e54bcf643dc0593227aa6e491061102ccd
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jul 3 13:05:37 2019 +0800

    [FLINK-12963] [state-processor] Refactor operator state access concerns into ModifiableSavepointMetadata
---
 .../apache/flink/state/api/WritableSavepoint.java  | 39 +++++++++-------------
 .../metadata/ModifiableSavepointMetadata.java      | 29 ++++++++++++++--
 2 files changed, 42 insertions(+), 26 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index 300d1ae..0f68cce 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -19,8 +19,10 @@ package org.apache.flink.state.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.state.api.output.MergeOperatorStates;
 import org.apache.flink.state.api.output.SavepointOutputFormat;
@@ -28,7 +30,6 @@ import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
 import org.apache.flink.util.Preconditions;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Any savepoint that can be written to from a batch context.
@@ -79,9 +80,10 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
 	public final void write(String path) {
 		final Path savepointPath = new Path(path);
 
-		DataSet<OperatorState> newOperatorStates = getOperatorStates(savepointPath);
+		List<Tuple2<OperatorID, BootstrapTransformation<?>>> newOperatorTransformations = metadata.getNewOperatorTransformations();
+		DataSet<OperatorState> newOperatorStates = writeOperatorStates(newOperatorTransformations, savepointPath);
 
-		List<OperatorState> existingOperators = getExistingOperatorStates();
+		List<OperatorState> existingOperators = metadata.getExistingOperators();
 
 		DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);
 
@@ -92,27 +94,6 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
 			.name(path);
 	}
 
-	private List<OperatorState> getExistingOperatorStates() {
-		return metadata
-			.getOperatorStates()
-			.entrySet()
-			.stream()
-			.filter(entry -> entry.getValue().isLeft())
-			.map(entry -> entry.getValue().left())
-			.collect(Collectors.toList());
-	}
-
-	private DataSet<OperatorState> getOperatorStates(Path savepointPath) {
-		return metadata
-			.getOperatorStates()
-			.entrySet()
-			.stream()
-			.filter(entry -> entry.getValue().isRight())
-			.map(entry -> entry.getValue().right().writeOperatorState(entry.getKey(), stateBackend, metadata, savepointPath))
-			.reduce(DataSet::union)
-			.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
-	}
-
 	private DataSet<OperatorState> unionOperatorStates(DataSet<OperatorState> newOperatorStates, List<OperatorState> existingOperators) {
 		DataSet<OperatorState> finalOperatorStates;
 		if (existingOperators.isEmpty()) {
@@ -126,4 +107,14 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
 		}
 		return finalOperatorStates;
 	}
+
+	private DataSet<OperatorState> writeOperatorStates(
+			List<Tuple2<OperatorID, BootstrapTransformation<?>>> newOperatorTransformations,
+			Path savepointWritePath) {
+		return newOperatorTransformations
+			.stream()
+			.map(transformation -> transformation.f1.writeOperatorState(transformation.f0, stateBackend, metadata, savepointWritePath))
+			.reduce(DataSet::union)
+			.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
+	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
index 158fa35..8df8723 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
@@ -1,6 +1,7 @@
 package org.apache.flink.state.api.runtime.metadata;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -11,7 +12,10 @@ import org.apache.flink.types.Either;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Savepoint metadata that can be modified.
@@ -61,7 +65,28 @@ public class ModifiableSavepointMetadata extends SavepointMetadata {
 		operatorStateIndex.put(id, Either.Right(transformation));
 	}
 
-	public Map<OperatorID, Either<OperatorState, BootstrapTransformation<?>>> getOperatorStates() {
-		return operatorStateIndex;
+	/**
+	 * @return List of {@link OperatorState} that already exists within the savepoint.
+	 */
+	public List<OperatorState> getExistingOperators() {
+		return operatorStateIndex
+			.values()
+			.stream()
+			.filter(Either::isLeft)
+			.map(Either::left)
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * @return List of new operator states for the savepoint, represented by their target {@link OperatorID} and {@link BootstrapTransformation}.
+	 */
+	public List<Tuple2<OperatorID, BootstrapTransformation<?>>> getNewOperatorTransformations() {
+		Stream<Tuple2<OperatorID, BootstrapTransformation<?>>> transformations = operatorStateIndex
+			.entrySet()
+			.stream()
+			.filter(entry -> entry.getValue().isRight())
+			.map(entry -> Tuple2.of(entry.getKey(), entry.getValue().right()));
+
+		return transformations.collect(Collectors.toList());
 	}
 }