You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:37 UTC

[flink] 06/11: [FLINK-12963] [state-processor] Reduce dependency on SavepointMetadata in runtime components

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f820eacba4f1340e0fd2fba3b092f37f4e9c626
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Jul 4 03:05:59 2019 +0800

    [FLINK-12963] [state-processor] Reduce dependency on SavepointMetadata in runtime components
    
    Previously, we were passing around SavepointMetadata across different
    runtime classes, such as MergeOperatorStates operator and
    BoostrapTransformation.write().
    
    This isn't necessary, as these runtime classes actually do not need
    functionality that the SavepointMetadata provides. The SavepointMetadata
    should essentially just be a data structure that maintains what existing
    operator states there are, and which operator states are added, in order
    to build the job graph.
    
    This commit changes this by only passing in the necessary information
    wrapped by SavepointMetadata to the runtime classes. By doing so, we can
    avoid making SavepointMetadata a Serializable, since it is now no longer
    part of the built job graph.
---
 .../org/apache/flink/state/api/BootstrapTransformation.java | 11 +++++------
 .../java/org/apache/flink/state/api/WritableSavepoint.java  |  4 ++--
 .../apache/flink/state/api/output/MergeOperatorStates.java  | 13 +++++++------
 .../api/runtime/metadata/ModifiableSavepointMetadata.java   |  2 +-
 .../flink/state/api/runtime/metadata/SavepointMetadata.java |  5 +----
 .../apache/flink/state/api/BootstrapTransformationTest.java | 11 ++++-------
 6 files changed, 20 insertions(+), 26 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
index edba718..c5ee559 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -37,7 +37,6 @@ import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperat
 import org.apache.flink.state.api.output.partitioner.HashSelector;
 import org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
 import org.apache.flink.state.api.runtime.BoundedStreamConfig;
-import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 
@@ -92,23 +91,23 @@ public class BootstrapTransformation<T> {
 	/**
 	 * @return The max parallelism for this operator.
 	 */
-	int getMaxParallelism(SavepointMetadata metadata) {
-		return operatorMaxParallelism.orElse(metadata.maxParallelism());
+	int getMaxParallelism(int globalMaxParallelism) {
+		return operatorMaxParallelism.orElse(globalMaxParallelism);
 	}
 
 	/**
 	 * @param operatorID The operator id for the stream operator.
 	 * @param stateBackend The state backend for the job.
-	 * @param metadata Metadata about the resulting savepoint.
+	 * @param globalMaxParallelism Global max parallelism set for the savepoint.
 	 * @param savepointPath The path where the savepoint will be written.
 	 * @return The operator subtask states for this bootstrap transformation.
 	 */
 	DataSet<OperatorState> writeOperatorState(
 		OperatorID operatorID,
 		StateBackend stateBackend,
-		SavepointMetadata metadata,
+		int globalMaxParallelism,
 		Path savepointPath) {
-		int localMaxParallelism = getMaxParallelism(metadata);
+		int localMaxParallelism = getMaxParallelism(globalMaxParallelism);
 
 		return writeOperatorSubtaskStates(operatorID, stateBackend, savepointPath, localMaxParallelism)
 			.reduceGroup(new OperatorSubtaskStateReducer(operatorID, localMaxParallelism))
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index 512edeb..b4a29e4 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -87,7 +87,7 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
 		DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);
 
 		finalOperatorStates
-			.reduceGroup(new MergeOperatorStates(metadata))
+			.reduceGroup(new MergeOperatorStates(metadata.getMasterStates()))
 			.name("reduce(OperatorState)")
 			.output(new SavepointOutputFormat(savepointPath))
 			.name(path);
@@ -114,7 +114,7 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
 			.stream()
 			.map(newOperatorState -> newOperatorState
 				.getBootstrapTransformation()
-				.writeOperatorState(newOperatorState.getOperatorID(), stateBackend, metadata, savepointWritePath))
+				.writeOperatorState(newOperatorState.getOperatorID(), stateBackend, metadata.maxParallelism(), savepointWritePath))
 			.reduce(DataSet::union)
 			.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
 	}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
index f0a7793..81b49b2 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
@@ -20,13 +20,14 @@ package org.apache.flink.state.api.output;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
-import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -35,12 +36,12 @@ import java.util.stream.StreamSupport;
  */
 @Internal
 public class MergeOperatorStates implements GroupReduceFunction<OperatorState, Savepoint> {
-	private final SavepointMetadata metadata;
+	private final Collection<MasterState> masterStates;
 
-	public MergeOperatorStates(SavepointMetadata metadata) {
-		Preconditions.checkNotNull(metadata, "Savepoint metadata must not be null");
+	public MergeOperatorStates(Collection<MasterState> masterStates) {
+		Preconditions.checkNotNull(masterStates, "Master state metadata must not be null");
 
-		this.metadata = metadata;
+		this.masterStates = masterStates;
 	}
 
 	@Override
@@ -49,7 +50,7 @@ public class MergeOperatorStates implements GroupReduceFunction<OperatorState, S
 			new SavepointV2(
 				SnapshotUtils.CHECKPOINT_ID,
 				StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList()),
-				metadata.getMasterStates());
+				masterStates);
 
 		out.collect(savepoint);
 	}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
index 3484a59..eada085 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
@@ -21,7 +21,7 @@ import java.util.stream.Collectors;
 @Internal
 public class ModifiableSavepointMetadata extends SavepointMetadata {
 
-	private transient Map<OperatorID, OperatorStateSpec> operatorStateIndex;
+	private final Map<OperatorID, OperatorStateSpec> operatorStateIndex;
 
 	public ModifiableSavepointMetadata(int maxParallelism, Collection<MasterState> masterStates, Collection<OperatorState> initialStates) {
 		super(maxParallelism, masterStates);
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
index 40ce66a..91efa04 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
@@ -21,16 +21,13 @@ package org.apache.flink.state.api.runtime.metadata;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.MasterState;
 
-import java.io.Serializable;
 import java.util.Collection;
 
 /**
  * Returns metadata about a savepoint.
  */
 @Internal
-public class SavepointMetadata implements Serializable {
-
-	private static final long serialVersionUID = 1L;
+public class SavepointMetadata {
 
 	private final int maxParallelism;
 
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
index 707ccde..64994c4 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
@@ -31,14 +31,11 @@ import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
 import org.apache.flink.state.api.functions.StateBootstrapFunction;
 import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
 import org.apache.flink.state.api.runtime.OperatorIDGenerator;
-import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
-
 /**
  * Tests for bootstrap transformations.
  */
@@ -55,7 +52,7 @@ public class BootstrapTransformationTest extends AbstractTestBase {
 			.bootstrapWith(input)
 			.transform(new ExampleBroadcastStateBootstrapFunction());
 
-		int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+		int maxParallelism = transformation.getMaxParallelism(4);
 		DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
 			OperatorIDGenerator.fromUid("uid"),
 			new MemoryStateBackend(),
@@ -79,7 +76,7 @@ public class BootstrapTransformationTest extends AbstractTestBase {
 			.bootstrapWith(input)
 			.transform(new ExampleStateBootstrapFunction());
 
-		int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(10, Collections.emptyList()));
+		int maxParallelism = transformation.getMaxParallelism(10);
 		DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
 			OperatorIDGenerator.fromUid("uid"),
 			new MemoryStateBackend(),
@@ -104,7 +101,7 @@ public class BootstrapTransformationTest extends AbstractTestBase {
 			.bootstrapWith(input)
 			.transform(new ExampleStateBootstrapFunction());
 
-		int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+		int maxParallelism = transformation.getMaxParallelism(4);
 		DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
 			OperatorIDGenerator.fromUid("uid"),
 			new MemoryStateBackend(),
@@ -130,7 +127,7 @@ public class BootstrapTransformationTest extends AbstractTestBase {
 			.setMaxParallelism(1)
 			.transform(new ExampleStateBootstrapFunction());
 
-		int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+		int maxParallelism = transformation.getMaxParallelism(4);
 		DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
 			OperatorIDGenerator.fromUid("uid"),
 			new MemoryStateBackend(),