You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:38 UTC

[flink] 07/11: [FLINK-12963] [state-processor] Simplify SavepointMetadata hierarchy

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f16fed8ae5d581cb08e3121b0a68e01c4e2107b5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Jul 4 03:15:02 2019 +0800

    [FLINK-12963] [state-processor] Simplify SavepointMetadata hierarchy
    
    From the current state of its usage, the previous hierarchy
    (ModifiableSavepointMetada -> SavepointMetadata) is redundant. This
    commit simplifies this to just be a single SavepointMetadata class.
---
 .../apache/flink/state/api/ExistingSavepoint.java  |  6 +-
 .../org/apache/flink/state/api/NewSavepoint.java   |  4 +-
 .../java/org/apache/flink/state/api/Savepoint.java |  6 +-
 .../apache/flink/state/api/WritableSavepoint.java  |  8 +-
 .../metadata/ModifiableSavepointMetadata.java      | 88 ---------------------
 .../api/runtime/metadata/SavepointMetadata.java    | 91 +++++++++++++++++++---
 .../org/apache/flink/state/api/SavepointTest.java  |  8 +-
 7 files changed, 98 insertions(+), 113 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
index a169662..e82b579 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
@@ -37,7 +37,7 @@ import org.apache.flink.state.api.input.BroadcastStateInputFormat;
 import org.apache.flink.state.api.input.KeyedStateInputFormat;
 import org.apache.flink.state.api.input.ListStateInputFormat;
 import org.apache.flink.state.api.input.UnionStateInputFormat;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -50,11 +50,11 @@ import java.io.IOException;
 public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
 	private final ExecutionEnvironment env;
 
-	private final ModifiableSavepointMetadata metadata;
+	private final SavepointMetadata metadata;
 
 	private final StateBackend stateBackend;
 
-	ExistingSavepoint(ExecutionEnvironment env, ModifiableSavepointMetadata metadata, StateBackend stateBackend) throws IOException {
+	ExistingSavepoint(ExecutionEnvironment env, SavepointMetadata metadata, StateBackend stateBackend) throws IOException {
 		super(metadata, stateBackend);
 		Preconditions.checkNotNull(env, "The execution environment must not be null");
 		Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
index f42e886..dc371eb 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
@@ -20,14 +20,14 @@ package org.apache.flink.state.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 
 /**
  * A new savepoint.
  */
 @PublicEvolving
 public class NewSavepoint extends WritableSavepoint<NewSavepoint> {
-	NewSavepoint(ModifiableSavepointMetadata metadata, StateBackend stateBackend) {
+	NewSavepoint(SavepointMetadata metadata, StateBackend stateBackend) {
 		super(metadata, stateBackend);
 	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
index 9754c5b..a90d810 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.state.api.runtime.SavepointLoader;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -59,7 +59,7 @@ public final class Savepoint {
 			.max(Comparator.naturalOrder())
 			.orElseThrow(() -> new RuntimeException("Savepoint's must contain at least one operator"));
 
-		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(maxParallelism, savepoint.getMasterStates(), savepoint.getOperatorStates());
+		SavepointMetadata metadata = new SavepointMetadata(maxParallelism, savepoint.getMasterStates(), savepoint.getOperatorStates());
 		return new ExistingSavepoint(env, metadata, stateBackend);
 	}
 
@@ -76,7 +76,7 @@ public final class Savepoint {
 			"Maximum parallelism must be between 1 and " + UPPER_BOUND_MAX_PARALLELISM
 				+ ". Found: " + maxParallelism);
 
-		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(maxParallelism, Collections.emptyList(), Collections.emptyList());
+		SavepointMetadata metadata = new SavepointMetadata(maxParallelism, Collections.emptyList(), Collections.emptyList());
 		return new NewSavepoint(metadata, stateBackend);
 	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index b4a29e4..fb60779 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.state.api.output.MergeOperatorStates;
 import org.apache.flink.state.api.output.SavepointOutputFormat;
 import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 import org.apache.flink.util.Preconditions;
 
 import java.util.List;
@@ -38,11 +38,11 @@ import java.util.List;
 @SuppressWarnings("WeakerAccess")
 public abstract class WritableSavepoint<F extends WritableSavepoint> {
 
-	protected final ModifiableSavepointMetadata metadata;
+	protected final SavepointMetadata metadata;
 
 	protected final StateBackend stateBackend;
 
-	WritableSavepoint(ModifiableSavepointMetadata metadata, StateBackend stateBackend) {
+	WritableSavepoint(SavepointMetadata metadata, StateBackend stateBackend) {
 		Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
 		Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
 		this.metadata = metadata;
@@ -114,7 +114,7 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
 			.stream()
 			.map(newOperatorState -> newOperatorState
 				.getBootstrapTransformation()
-				.writeOperatorState(newOperatorState.getOperatorID(), stateBackend, metadata.maxParallelism(), savepointWritePath))
+				.writeOperatorState(newOperatorState.getOperatorID(), stateBackend, metadata.getMaxParallelism(), savepointWritePath))
 			.reduce(DataSet::union)
 			.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
 	}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
deleted file mode 100644
index eada085..0000000
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.flink.state.api.runtime.metadata;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.state.api.BootstrapTransformation;
-import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
-import org.apache.flink.state.api.runtime.OperatorIDGenerator;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Savepoint metadata that can be modified.
- */
-@Internal
-public class ModifiableSavepointMetadata extends SavepointMetadata {
-
-	private final Map<OperatorID, OperatorStateSpec> operatorStateIndex;
-
-	public ModifiableSavepointMetadata(int maxParallelism, Collection<MasterState> masterStates, Collection<OperatorState> initialStates) {
-		super(maxParallelism, masterStates);
-
-		this.operatorStateIndex = new HashMap<>(initialStates.size());
-		initialStates.forEach(existingState -> operatorStateIndex.put(
-			existingState.getOperatorID(),
-			OperatorStateSpec.existing(existingState)));
-	}
-
-	/**
-	 * @return Operator state for the given UID.
-	 *
-	 * @throws IOException If the savepoint does not contain operator state with the given uid.
-	 */
-	public OperatorState getOperatorState(String uid) throws IOException {
-		OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
-
-		OperatorStateSpec operatorState = operatorStateIndex.get(operatorID);
-		if (operatorState == null || operatorState.isNewStateTransformation()) {
-			throw new IOException("Savepoint does not contain state with operator uid " + uid);
-		}
-
-		return operatorState.asExistingState();
-	}
-
-	public void removeOperator(String uid) {
-		operatorStateIndex.remove(OperatorIDGenerator.fromUid(uid));
-	}
-
-	public void addOperator(String uid, BootstrapTransformation<?> transformation) {
-		OperatorID id = OperatorIDGenerator.fromUid(uid);
-
-		if (operatorStateIndex.containsKey(id)) {
-			throw new IllegalArgumentException("The savepoint already contains uid " + uid + ". All uid's must be unique");
-		}
-
-		operatorStateIndex.put(id, OperatorStateSpec.newWithTransformation(new BootstrapTransformationWithID<>(id, transformation)));
-	}
-
-	/**
-	 * @return List of {@link OperatorState} that already exists within the savepoint.
-	 */
-	public List<OperatorState> getExistingOperators() {
-		return operatorStateIndex
-			.values()
-			.stream()
-			.filter(OperatorStateSpec::isExistingState)
-			.map(OperatorStateSpec::asExistingState)
-			.collect(Collectors.toList());
-	}
-
-	/**
-	 * @return List of new operator states for the savepoint, represented by their target {@link OperatorID} and {@link BootstrapTransformation}.
-	 */
-	public List<BootstrapTransformationWithID<?>> getNewOperators() {
-		return operatorStateIndex
-			.values()
-			.stream()
-			.filter(OperatorStateSpec::isNewStateTransformation)
-			.map(OperatorStateSpec::asNewStateTransformation)
-			.collect(Collectors.toList());
-	}
-}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
index 91efa04..b8590f2 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
@@ -20,11 +20,24 @@ package org.apache.flink.state.api.runtime.metadata;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.BootstrapTransformation;
+import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
 
 /**
- * Returns metadata about a savepoint.
+ * Savepoint metadata that can be modified.
  */
 @Internal
 public class SavepointMetadata {
@@ -33,22 +46,82 @@ public class SavepointMetadata {
 
 	private final Collection<MasterState> masterStates;
 
-	public SavepointMetadata(int maxParallelism, Collection<MasterState> masterStates) {
+	private final Map<OperatorID, OperatorStateSpec> operatorStateIndex;
+
+	public SavepointMetadata(int maxParallelism, Collection<MasterState> masterStates, Collection<OperatorState> initialStates) {
+		Preconditions.checkArgument(maxParallelism > 0
+				&& maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
+			"Maximum parallelism must be between 1 and " + UPPER_BOUND_MAX_PARALLELISM
+				+ ". Found: " + maxParallelism);
 		this.maxParallelism = maxParallelism;
-		this.masterStates = masterStates;
+
+		this.masterStates = Preconditions.checkNotNull(masterStates);
+
+		this.operatorStateIndex = new HashMap<>(initialStates.size());
+		initialStates.forEach(existingState -> operatorStateIndex.put(
+			existingState.getOperatorID(),
+			OperatorStateSpec.existing(existingState)));
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public Collection<MasterState> getMasterStates() {
+		return masterStates;
 	}
 
 	/**
-	 * @return The max parallelism for the savepoint.
+	 * @return Operator state for the given UID.
+	 *
+	 * @throws IOException If the savepoint does not contain operator state with the given uid.
 	 */
-	public int maxParallelism() {
-		return maxParallelism;
+	public OperatorState getOperatorState(String uid) throws IOException {
+		OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
+
+		OperatorStateSpec operatorState = operatorStateIndex.get(operatorID);
+		if (operatorState == null || operatorState.isNewStateTransformation()) {
+			throw new IOException("Savepoint does not contain state with operator uid " + uid);
+		}
+
+		return operatorState.asExistingState();
+	}
+
+	public void removeOperator(String uid) {
+		operatorStateIndex.remove(OperatorIDGenerator.fromUid(uid));
+	}
+
+	public void addOperator(String uid, BootstrapTransformation<?> transformation) {
+		OperatorID id = OperatorIDGenerator.fromUid(uid);
+
+		if (operatorStateIndex.containsKey(id)) {
+			throw new IllegalArgumentException("The savepoint already contains uid " + uid + ". All uid's must be unique");
+		}
+
+		operatorStateIndex.put(id, OperatorStateSpec.newWithTransformation(new BootstrapTransformationWithID<>(id, transformation)));
 	}
 
 	/**
-	 * @return Masters states for the savepoint.
+	 * @return List of {@link OperatorState} that already exists within the savepoint.
 	 */
-	public Collection<MasterState> getMasterStates() {
-		return masterStates;
+	public List<OperatorState> getExistingOperators() {
+		return operatorStateIndex
+			.values()
+			.stream()
+			.filter(OperatorStateSpec::isExistingState)
+			.map(OperatorStateSpec::asExistingState)
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * @return List of new operator states for the savepoint, represented by their target {@link OperatorID} and {@link BootstrapTransformation}.
+	 */
+	public List<BootstrapTransformationWithID<?>> getNewOperators() {
+		return operatorStateIndex
+			.values()
+			.stream()
+			.filter(OperatorStateSpec::isNewStateTransformation)
+			.map(OperatorStateSpec::asNewStateTransformation)
+			.collect(Collectors.toList());
 	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
index 7d34244..3d34840 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.state.api.functions.StateBootstrapFunction;
 import org.apache.flink.state.api.runtime.OperatorIDGenerator;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 
 import org.junit.Test;
 
@@ -52,7 +52,7 @@ public class SavepointTest {
 			.bootstrapWith(input)
 			.transform(new ExampleStateBootstrapFunction());
 
-		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(1, Collections.emptyList(), Collections.emptyList());
+		SavepointMetadata metadata = new SavepointMetadata(1, Collections.emptyList(), Collections.emptyList());
 
 		new NewSavepoint(metadata, new MemoryStateBackend())
 			.withOperator(UID, transformation)
@@ -73,7 +73,7 @@ public class SavepointTest {
 		Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
 			OperatorIDGenerator.fromUid(UID), 1, 4));
 
-		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(4, Collections.emptyList(), operatorStates);
+		SavepointMetadata metadata = new SavepointMetadata(4, Collections.emptyList(), operatorStates);
 
 		new ExistingSavepoint(env, metadata, new MemoryStateBackend())
 			.withOperator(UID, transformation)
@@ -94,7 +94,7 @@ public class SavepointTest {
 		Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
 			OperatorIDGenerator.fromUid(UID), 1, 4));
 
-		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(4, Collections.emptyList(), operatorStates);
+		SavepointMetadata metadata = new SavepointMetadata(4, Collections.emptyList(), operatorStates);
 
 		new ExistingSavepoint(env, metadata, new MemoryStateBackend())
 			.withOperator(UID, transformation)