You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 08:01:37 UTC

[2/5] flink git commit: [FLINK-6742] Add eager checks for parallelism/chain-length change

[FLINK-6742] Add eager checks for parallelism/chain-length change


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c65317dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c65317dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c65317dc

Branch: refs/heads/release-1.3
Commit: c65317dc9619f2a5459c39278b2109137e94d79f
Parents: 1d2c615
Author: zentol <ch...@apache.org>
Authored: Mon Jun 26 13:38:54 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 30 08:51:49 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/savepoint/SavepointV2.java      | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c65317dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 5e46f93..bd364a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -178,6 +179,18 @@ public class SavepointV2 implements Savepoint {
 
 			List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
 
+			Preconditions.checkArgument(
+				jobVertex.getParallelism() == taskState.getParallelism(),
+				"Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() +"." +
+					"When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
+					"to the parallelism of stateful operators.");
+
+			Preconditions.checkArgument(
+				operatorIDs.size() == taskState.getChainLength(),
+				"Detected change in chain length during migration for task " + jobVertex.getJobVertexId() +". " +
+					"When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
+					"changed by modification of a chain containing a stateful operator.");
+
 			for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {
 				SubtaskState subtaskState;
 				try {