You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:26 UTC

[11/15] flink git commit: [hotfix] Add null checks in StateAssignmentOperation

[hotfix] Add null checks in StateAssignmentOperation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74df7631
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74df7631
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74df7631

Branch: refs/heads/master
Commit: 74df7631316e78af39a5416e12c1adc8a46d87fe
Parents: d1eaa1e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 19 19:50:51 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/StateAssignmentOperation.java     | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74df7631/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 61a71e5..2e05a85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -185,7 +185,9 @@ public class StateAssignmentOperation {
 				ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
 
 				if (!parallelismChanged) {
-					nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
+					if (taskState.getState(subTaskIdx) != null) {
+						nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
+					}
 				}
 
 				// partitionable state
@@ -224,10 +226,17 @@ public class StateAssignmentOperation {
 					newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
 				} else {
 					SubtaskState subtaskState = taskState.getState(subTaskIdx);
-					KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
-					KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
-					newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(oldKeyedStatesBackend) : null;
-					newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(oldKeyedStatesStream) : null;
+					if (subtaskState != null) {
+						KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
+						KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
+						newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(
+								oldKeyedStatesBackend) : null;
+						newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(
+								oldKeyedStatesStream) : null;
+					} else {
+						newKeyedStatesBackend = null;
+						newKeyedStateStream = null;
+					}
 				}
 
 				TaskStateHandles taskStateHandles = new TaskStateHandles(