You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:26 UTC
[11/15] flink git commit: [hotfix] Add null checks in
StateAssignmentOperation
[hotfix] Add null checks in StateAssignmentOperation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74df7631
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74df7631
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74df7631
Branch: refs/heads/master
Commit: 74df7631316e78af39a5416e12c1adc8a46d87fe
Parents: d1eaa1e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 19 19:50:51 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100
----------------------------------------------------------------------
.../checkpoint/StateAssignmentOperation.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/74df7631/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 61a71e5..2e05a85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -185,7 +185,9 @@ public class StateAssignmentOperation {
ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
if (!parallelismChanged) {
- nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
+ if (taskState.getState(subTaskIdx) != null) {
+ nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
+ }
}
// partitionable state
@@ -224,10 +226,17 @@ public class StateAssignmentOperation {
newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
} else {
SubtaskState subtaskState = taskState.getState(subTaskIdx);
- KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
- KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
- newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(oldKeyedStatesBackend) : null;
- newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(oldKeyedStatesStream) : null;
+ if (subtaskState != null) {
+ KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
+ KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
+ newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(
+ oldKeyedStatesBackend) : null;
+ newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(
+ oldKeyedStatesStream) : null;
+ } else {
+ newKeyedStatesBackend = null;
+ newKeyedStateStream = null;
+ }
}
TaskStateHandles taskStateHandles = new TaskStateHandles(