You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/25 06:44:51 UTC

[08/21] flink git commit: [FLINK-6742] Improve savepoint migration failure error message

[FLINK-6742] Improve savepoint migration failure error message

This closes #4083.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72b0ae06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72b0ae06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72b0ae06

Branch: refs/heads/master
Commit: 72b0ae069f8404a2f8a952e1a20004b9d340c445
Parents: 7b97371
Author: zentol <ch...@apache.org>
Authored: Wed Jun 7 12:03:21 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 23 14:14:29 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/savepoint/SavepointV2.java        | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72b0ae06/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 1b2963d..5e46f93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -168,10 +168,27 @@ public class SavepointV2 implements Savepoint {
 				expandedToLegacyIds = true;
 			}
 
+			if (jobVertex == null) {
+				throw new IllegalStateException(
+					"Could not find task for state with ID " + taskState.getJobVertexID() + ". " +
+					"When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
+					"changed through removal of a stateful operator or modification of a chain containing a stateful " +
+					"operator.");
+			}
+
 			List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
 
 			for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {
-				SubtaskState subtaskState = taskState.getState(subtaskIndex);
+				SubtaskState subtaskState;
+				try {
+					subtaskState = taskState.getState(subtaskIndex);
+				} catch (Exception e) {
+					throw new IllegalStateException(
+						"Could not find subtask with index " + subtaskIndex + " for task " + jobVertex.getJobVertexId() + ". " +
+						"When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
+						"to the parallelism of stateful operators.",
+						e);
+				}
 
 				if (subtaskState == null) {
 					continue;