You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 11:50:09 UTC

[2/5] flink git commit: [FLINK-6527] [checkpoint] OperatorSubtaskState has empty implementations of (un)/registerSharedStates

[FLINK-6527] [checkpoint] OperatorSubtaskState has empty implementations of (un)/registerSharedStates


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efbb41bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efbb41bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efbb41bc

Branch: refs/heads/master
Commit: efbb41bc633e6c72037b9dfd311b23693335844e
Parents: 958773b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed May 10 14:57:55 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/OperatorSubtaskState.java    | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efbb41bc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 863816a..49ef863 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -148,12 +148,24 @@ public class OperatorSubtaskState implements CompositeStateHandle {
 
 	@Override
 	public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
-		// No shared states
+		if (managedKeyedState != null) {
+			managedKeyedState.registerSharedStates(sharedStateRegistry);
+		}
+
+		if (rawKeyedState != null) {
+			rawKeyedState.registerSharedStates(sharedStateRegistry);
+		}
 	}
 
 	@Override
 	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		// No shared states
+		if (managedKeyedState != null) {
+			managedKeyedState.unregisterSharedStates(sharedStateRegistry);
+		}
+
+		if (rawKeyedState != null) {
+			rawKeyedState.unregisterSharedStates(sharedStateRegistry);
+		}
 	}
 
 	@Override