You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/25 06:44:57 UTC

[14/21] flink git commit: [FLINK-6682] [checkpoints] Improve error message in case parallelism exceeds maxParallelism

[FLINK-6682] [checkpoints] Improve error message in case parallelism exceeds maxParallelism

This closes #4125.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c736ba2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c736ba2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c736ba2e

Branch: refs/heads/master
Commit: c736ba2ef15e9e81a54a3fc02ccffadcbf594767
Parents: 7216407
Author: zhangminglei <zm...@163.com>
Authored: Tue Jun 20 19:43:44 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 23 14:14:30 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/StateAssignmentOperation.java   | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c736ba2e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 1042d5a..5712ea1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -464,6 +464,14 @@ public class StateAssignmentOperation {
 	private static void checkParallelismPreconditions(OperatorState operatorState, ExecutionJobVertex executionJobVertex) {
 		//----------------------------------------max parallelism preconditions-------------------------------------
 
+		if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
+			throw new IllegalStateException("The state for task " + executionJobVertex.getJobVertexId() +
+				" can not be restored. The maximum parallelism (" + operatorState.getMaxParallelism() +
+				") of the restored state is lower than the configured parallelism (" + executionJobVertex.getParallelism() +
+				"). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism."
+			);
+		}
+
 		// check that the number of key groups have not changed or if we need to override it to satisfy the restored state
 		if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {