You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:38 UTC
[flink] 07/11: [FLINK-12963] [state-processor] Simplify
SavepointMetadata hierarchy
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f16fed8ae5d581cb08e3121b0a68e01c4e2107b5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Jul 4 03:15:02 2019 +0800
[FLINK-12963] [state-processor] Simplify SavepointMetadata hierarchy
From the current state of its usage, the previous hierarchy
(ModifiableSavepointMetada -> SavepointMetadata) is redundant. This
commit simplifies this to just be a single SavepointMetadata class.
---
.../apache/flink/state/api/ExistingSavepoint.java | 6 +-
.../org/apache/flink/state/api/NewSavepoint.java | 4 +-
.../java/org/apache/flink/state/api/Savepoint.java | 6 +-
.../apache/flink/state/api/WritableSavepoint.java | 8 +-
.../metadata/ModifiableSavepointMetadata.java | 88 ---------------------
.../api/runtime/metadata/SavepointMetadata.java | 91 +++++++++++++++++++---
.../org/apache/flink/state/api/SavepointTest.java | 8 +-
7 files changed, 98 insertions(+), 113 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
index a169662..e82b579 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
@@ -37,7 +37,7 @@ import org.apache.flink.state.api.input.BroadcastStateInputFormat;
import org.apache.flink.state.api.input.KeyedStateInputFormat;
import org.apache.flink.state.api.input.ListStateInputFormat;
import org.apache.flink.state.api.input.UnionStateInputFormat;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -50,11 +50,11 @@ import java.io.IOException;
public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
private final ExecutionEnvironment env;
- private final ModifiableSavepointMetadata metadata;
+ private final SavepointMetadata metadata;
private final StateBackend stateBackend;
- ExistingSavepoint(ExecutionEnvironment env, ModifiableSavepointMetadata metadata, StateBackend stateBackend) throws IOException {
+ ExistingSavepoint(ExecutionEnvironment env, SavepointMetadata metadata, StateBackend stateBackend) throws IOException {
super(metadata, stateBackend);
Preconditions.checkNotNull(env, "The execution environment must not be null");
Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
index f42e886..dc371eb 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
@@ -20,14 +20,14 @@ package org.apache.flink.state.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
/**
* A new savepoint.
*/
@PublicEvolving
public class NewSavepoint extends WritableSavepoint<NewSavepoint> {
- NewSavepoint(ModifiableSavepointMetadata metadata, StateBackend stateBackend) {
+ NewSavepoint(SavepointMetadata metadata, StateBackend stateBackend) {
super(metadata, stateBackend);
}
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
index 9754c5b..a90d810 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.runtime.SavepointLoader;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -59,7 +59,7 @@ public final class Savepoint {
.max(Comparator.naturalOrder())
.orElseThrow(() -> new RuntimeException("Savepoint's must contain at least one operator"));
- ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(maxParallelism, savepoint.getMasterStates(), savepoint.getOperatorStates());
+ SavepointMetadata metadata = new SavepointMetadata(maxParallelism, savepoint.getMasterStates(), savepoint.getOperatorStates());
return new ExistingSavepoint(env, metadata, stateBackend);
}
@@ -76,7 +76,7 @@ public final class Savepoint {
"Maximum parallelism must be between 1 and " + UPPER_BOUND_MAX_PARALLELISM
+ ". Found: " + maxParallelism);
- ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(maxParallelism, Collections.emptyList(), Collections.emptyList());
+ SavepointMetadata metadata = new SavepointMetadata(maxParallelism, Collections.emptyList(), Collections.emptyList());
return new NewSavepoint(metadata, stateBackend);
}
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index b4a29e4..fb60779 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.output.MergeOperatorStates;
import org.apache.flink.state.api.output.SavepointOutputFormat;
import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.util.Preconditions;
import java.util.List;
@@ -38,11 +38,11 @@ import java.util.List;
@SuppressWarnings("WeakerAccess")
public abstract class WritableSavepoint<F extends WritableSavepoint> {
- protected final ModifiableSavepointMetadata metadata;
+ protected final SavepointMetadata metadata;
protected final StateBackend stateBackend;
- WritableSavepoint(ModifiableSavepointMetadata metadata, StateBackend stateBackend) {
+ WritableSavepoint(SavepointMetadata metadata, StateBackend stateBackend) {
Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
this.metadata = metadata;
@@ -114,7 +114,7 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
.stream()
.map(newOperatorState -> newOperatorState
.getBootstrapTransformation()
- .writeOperatorState(newOperatorState.getOperatorID(), stateBackend, metadata.maxParallelism(), savepointWritePath))
+ .writeOperatorState(newOperatorState.getOperatorID(), stateBackend, metadata.getMaxParallelism(), savepointWritePath))
.reduce(DataSet::union)
.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
deleted file mode 100644
index eada085..0000000
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.flink.state.api.runtime.metadata;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.state.api.BootstrapTransformation;
-import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
-import org.apache.flink.state.api.runtime.OperatorIDGenerator;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Savepoint metadata that can be modified.
- */
-@Internal
-public class ModifiableSavepointMetadata extends SavepointMetadata {
-
- private final Map<OperatorID, OperatorStateSpec> operatorStateIndex;
-
- public ModifiableSavepointMetadata(int maxParallelism, Collection<MasterState> masterStates, Collection<OperatorState> initialStates) {
- super(maxParallelism, masterStates);
-
- this.operatorStateIndex = new HashMap<>(initialStates.size());
- initialStates.forEach(existingState -> operatorStateIndex.put(
- existingState.getOperatorID(),
- OperatorStateSpec.existing(existingState)));
- }
-
- /**
- * @return Operator state for the given UID.
- *
- * @throws IOException If the savepoint does not contain operator state with the given uid.
- */
- public OperatorState getOperatorState(String uid) throws IOException {
- OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
-
- OperatorStateSpec operatorState = operatorStateIndex.get(operatorID);
- if (operatorState == null || operatorState.isNewStateTransformation()) {
- throw new IOException("Savepoint does not contain state with operator uid " + uid);
- }
-
- return operatorState.asExistingState();
- }
-
- public void removeOperator(String uid) {
- operatorStateIndex.remove(OperatorIDGenerator.fromUid(uid));
- }
-
- public void addOperator(String uid, BootstrapTransformation<?> transformation) {
- OperatorID id = OperatorIDGenerator.fromUid(uid);
-
- if (operatorStateIndex.containsKey(id)) {
- throw new IllegalArgumentException("The savepoint already contains uid " + uid + ". All uid's must be unique");
- }
-
- operatorStateIndex.put(id, OperatorStateSpec.newWithTransformation(new BootstrapTransformationWithID<>(id, transformation)));
- }
-
- /**
- * @return List of {@link OperatorState} that already exists within the savepoint.
- */
- public List<OperatorState> getExistingOperators() {
- return operatorStateIndex
- .values()
- .stream()
- .filter(OperatorStateSpec::isExistingState)
- .map(OperatorStateSpec::asExistingState)
- .collect(Collectors.toList());
- }
-
- /**
- * @return List of new operator states for the savepoint, represented by their target {@link OperatorID} and {@link BootstrapTransformation}.
- */
- public List<BootstrapTransformationWithID<?>> getNewOperators() {
- return operatorStateIndex
- .values()
- .stream()
- .filter(OperatorStateSpec::isNewStateTransformation)
- .map(OperatorStateSpec::asNewStateTransformation)
- .collect(Collectors.toList());
- }
-}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
index 91efa04..b8590f2 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
@@ -20,11 +20,24 @@ package org.apache.flink.state.api.runtime.metadata;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.BootstrapTransformation;
+import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.util.Preconditions;
+import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
/**
- * Returns metadata about a savepoint.
+ * Savepoint metadata that can be modified.
*/
@Internal
public class SavepointMetadata {
@@ -33,22 +46,82 @@ public class SavepointMetadata {
private final Collection<MasterState> masterStates;
- public SavepointMetadata(int maxParallelism, Collection<MasterState> masterStates) {
+ private final Map<OperatorID, OperatorStateSpec> operatorStateIndex;
+
+ public SavepointMetadata(int maxParallelism, Collection<MasterState> masterStates, Collection<OperatorState> initialStates) {
+ Preconditions.checkArgument(maxParallelism > 0
+ && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
+ "Maximum parallelism must be between 1 and " + UPPER_BOUND_MAX_PARALLELISM
+ + ". Found: " + maxParallelism);
this.maxParallelism = maxParallelism;
- this.masterStates = masterStates;
+
+ this.masterStates = Preconditions.checkNotNull(masterStates);
+
+ this.operatorStateIndex = new HashMap<>(initialStates.size());
+ initialStates.forEach(existingState -> operatorStateIndex.put(
+ existingState.getOperatorID(),
+ OperatorStateSpec.existing(existingState)));
+ }
+
+ public int getMaxParallelism() {
+ return maxParallelism;
+ }
+
+ public Collection<MasterState> getMasterStates() {
+ return masterStates;
}
/**
- * @return The max parallelism for the savepoint.
+ * @return Operator state for the given UID.
+ *
+ * @throws IOException If the savepoint does not contain operator state with the given uid.
*/
- public int maxParallelism() {
- return maxParallelism;
+ public OperatorState getOperatorState(String uid) throws IOException {
+ OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
+
+ OperatorStateSpec operatorState = operatorStateIndex.get(operatorID);
+ if (operatorState == null || operatorState.isNewStateTransformation()) {
+ throw new IOException("Savepoint does not contain state with operator uid " + uid);
+ }
+
+ return operatorState.asExistingState();
+ }
+
+ public void removeOperator(String uid) {
+ operatorStateIndex.remove(OperatorIDGenerator.fromUid(uid));
+ }
+
+ public void addOperator(String uid, BootstrapTransformation<?> transformation) {
+ OperatorID id = OperatorIDGenerator.fromUid(uid);
+
+ if (operatorStateIndex.containsKey(id)) {
+ throw new IllegalArgumentException("The savepoint already contains uid " + uid + ". All uid's must be unique");
+ }
+
+ operatorStateIndex.put(id, OperatorStateSpec.newWithTransformation(new BootstrapTransformationWithID<>(id, transformation)));
}
/**
- * @return Masters states for the savepoint.
+ * @return List of {@link OperatorState} that already exists within the savepoint.
*/
- public Collection<MasterState> getMasterStates() {
- return masterStates;
+ public List<OperatorState> getExistingOperators() {
+ return operatorStateIndex
+ .values()
+ .stream()
+ .filter(OperatorStateSpec::isExistingState)
+ .map(OperatorStateSpec::asExistingState)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @return List of new operator states for the savepoint, represented by their target {@link OperatorID} and {@link BootstrapTransformation}.
+ */
+ public List<BootstrapTransformationWithID<?>> getNewOperators() {
+ return operatorStateIndex
+ .values()
+ .stream()
+ .filter(OperatorStateSpec::isNewStateTransformation)
+ .map(OperatorStateSpec::asNewStateTransformation)
+ .collect(Collectors.toList());
}
}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
index 7d34244..3d34840 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.functions.StateBootstrapFunction;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
-import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.junit.Test;
@@ -52,7 +52,7 @@ public class SavepointTest {
.bootstrapWith(input)
.transform(new ExampleStateBootstrapFunction());
- ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(1, Collections.emptyList(), Collections.emptyList());
+ SavepointMetadata metadata = new SavepointMetadata(1, Collections.emptyList(), Collections.emptyList());
new NewSavepoint(metadata, new MemoryStateBackend())
.withOperator(UID, transformation)
@@ -73,7 +73,7 @@ public class SavepointTest {
Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
OperatorIDGenerator.fromUid(UID), 1, 4));
- ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(4, Collections.emptyList(), operatorStates);
+ SavepointMetadata metadata = new SavepointMetadata(4, Collections.emptyList(), operatorStates);
new ExistingSavepoint(env, metadata, new MemoryStateBackend())
.withOperator(UID, transformation)
@@ -94,7 +94,7 @@ public class SavepointTest {
Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
OperatorIDGenerator.fromUid(UID), 1, 4));
- ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(4, Collections.emptyList(), operatorStates);
+ SavepointMetadata metadata = new SavepointMetadata(4, Collections.emptyList(), operatorStates);
new ExistingSavepoint(env, metadata, new MemoryStateBackend())
.withOperator(UID, transformation)