You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/25 06:44:51 UTC
[08/21] flink git commit: [FLINK-6742] Improve savepoint migration
failure error message
[FLINK-6742] Improve savepoint migration failure error message
This closes #4083.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72b0ae06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72b0ae06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72b0ae06
Branch: refs/heads/master
Commit: 72b0ae069f8404a2f8a952e1a20004b9d340c445
Parents: 7b97371
Author: zentol <ch...@apache.org>
Authored: Wed Jun 7 12:03:21 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 23 14:14:29 2017 +0200
----------------------------------------------------------------------
.../checkpoint/savepoint/SavepointV2.java | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/72b0ae06/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 1b2963d..5e46f93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -168,10 +168,27 @@ public class SavepointV2 implements Savepoint {
expandedToLegacyIds = true;
}
+ if (jobVertex == null) {
+ throw new IllegalStateException(
+ "Could not find task for state with ID " + taskState.getJobVertexID() + ". " +
+ "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
+ "changed through removal of a stateful operator or modification of a chain containing a stateful " +
+ "operator.");
+ }
+
List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {
- SubtaskState subtaskState = taskState.getState(subtaskIndex);
+ SubtaskState subtaskState;
+ try {
+ subtaskState = taskState.getState(subtaskIndex);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Could not find subtask with index " + subtaskIndex + " for task " + jobVertex.getJobVertexId() + ". " +
+ "When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
+ "to the parallelism of stateful operators.",
+ e);
+ }
if (subtaskState == null) {
continue;