You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/25 06:44:57 UTC
[14/21] flink git commit: [FLINK-6682] [checkpoints] Improve error
message in case parallelism exceeds maxParallelism
[FLINK-6682] [checkpoints] Improve error message in case parallelism exceeds maxParallelism
This closes #4125.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c736ba2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c736ba2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c736ba2e
Branch: refs/heads/master
Commit: c736ba2ef15e9e81a54a3fc02ccffadcbf594767
Parents: 7216407
Author: zhangminglei <zm...@163.com>
Authored: Tue Jun 20 19:43:44 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 23 14:14:30 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/checkpoint/StateAssignmentOperation.java | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c736ba2e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 1042d5a..5712ea1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -464,6 +464,14 @@ public class StateAssignmentOperation {
private static void checkParallelismPreconditions(OperatorState operatorState, ExecutionJobVertex executionJobVertex) {
//----------------------------------------max parallelism preconditions-------------------------------------
+ if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
+ throw new IllegalStateException("The state for task " + executionJobVertex.getJobVertexId() +
+ " can not be restored. The maximum parallelism (" + operatorState.getMaxParallelism() +
+ ") of the restored state is lower than the configured parallelism (" + executionJobVertex.getParallelism() +
+ "). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism."
+ );
+ }
+
// check that the number of key groups have not changed or if we need to override it to satisfy the restored state
if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {