You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:48 UTC

[37/50] [abbrv] flink git commit: [FLINK-6034] [checkpoints] Introduce KeyedStateHandle abstraction for the snapshots in keyed streams

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index b1c94cb..8aa76a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.ExceptionUtils;
@@ -30,8 +30,8 @@ import java.util.concurrent.RunnableFuture;
  */
 public class OperatorSnapshotResult {
 
-	private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture;
-	private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture;
+	private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
+	private RunnableFuture<KeyedStateHandle> keyedStateRawFuture;
 	private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
 	private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
 
@@ -40,8 +40,8 @@ public class OperatorSnapshotResult {
 	}
 
 	public OperatorSnapshotResult(
-			RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture,
-			RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture,
+			RunnableFuture<KeyedStateHandle> keyedStateManagedFuture,
+			RunnableFuture<KeyedStateHandle> keyedStateRawFuture,
 			RunnableFuture<OperatorStateHandle> operatorStateManagedFuture,
 			RunnableFuture<OperatorStateHandle> operatorStateRawFuture) {
 		this.keyedStateManagedFuture = keyedStateManagedFuture;
@@ -50,19 +50,19 @@ public class OperatorSnapshotResult {
 		this.operatorStateRawFuture = operatorStateRawFuture;
 	}
 
-	public RunnableFuture<KeyGroupsStateHandle> getKeyedStateManagedFuture() {
+	public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() {
 		return keyedStateManagedFuture;
 	}
 
-	public void setKeyedStateManagedFuture(RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture) {
+	public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> keyedStateManagedFuture) {
 		this.keyedStateManagedFuture = keyedStateManagedFuture;
 	}
 
-	public RunnableFuture<KeyGroupsStateHandle> getKeyedStateRawFuture() {
+	public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() {
 		return keyedStateRawFuture;
 	}
 
-	public void setKeyedStateRawFuture(RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture) {
+	public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> keyedStateRawFuture) {
 		this.keyedStateRawFuture = keyedStateRawFuture;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
index 7abf8d9..30d07b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
@@ -42,16 +42,16 @@ public class OperatorStateHandles {
 
 	private final StreamStateHandle legacyOperatorState;
 
-	private final Collection<KeyGroupsStateHandle> managedKeyedState;
-	private final Collection<KeyGroupsStateHandle> rawKeyedState;
+	private final Collection<KeyedStateHandle> managedKeyedState;
+	private final Collection<KeyedStateHandle> rawKeyedState;
 	private final Collection<OperatorStateHandle> managedOperatorState;
 	private final Collection<OperatorStateHandle> rawOperatorState;
 
 	public OperatorStateHandles(
 			int operatorChainIndex,
 			StreamStateHandle legacyOperatorState,
-			Collection<KeyGroupsStateHandle> managedKeyedState,
-			Collection<KeyGroupsStateHandle> rawKeyedState,
+			Collection<KeyedStateHandle> managedKeyedState,
+			Collection<KeyedStateHandle> rawKeyedState,
 			Collection<OperatorStateHandle> managedOperatorState,
 			Collection<OperatorStateHandle> rawOperatorState) {
 
@@ -83,11 +83,11 @@ public class OperatorStateHandles {
 		return legacyOperatorState;
 	}
 
-	public Collection<KeyGroupsStateHandle> getManagedKeyedState() {
+	public Collection<KeyedStateHandle> getManagedKeyedState() {
 		return managedKeyedState;
 	}
 
-	public Collection<KeyGroupsStateHandle> getRawKeyedState() {
+	public Collection<KeyedStateHandle> getRawKeyedState() {
 		return rawKeyedState;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 76b2b98..11e8e0d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
@@ -849,8 +849,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		private final List<OperatorSnapshotResult> snapshotInProgressList;
 
-		private RunnableFuture<KeyGroupsStateHandle> futureKeyedBackendStateHandles;
-		private RunnableFuture<KeyGroupsStateHandle> futureKeyedStreamStateHandles;
+		private RunnableFuture<KeyedStateHandle> futureKeyedBackendStateHandles;
+		private RunnableFuture<KeyedStateHandle> futureKeyedStreamStateHandles;
 
 		private List<StreamStateHandle> nonPartitionedStateHandles;
 
@@ -892,8 +892,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		public void run() {
 			try {
 				// Keyed state handle future, currently only one (the head) operator can have this
-				KeyGroupsStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
-				KeyGroupsStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
+				KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
+				KeyedStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
 
 				List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(snapshotInProgressList.size());
 				List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size());
@@ -987,8 +987,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState,
 				ChainedStateHandle<OperatorStateHandle> chainedOperatorStateBackend,
 				ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream,
-				KeyGroupsStateHandle keyedStateHandleBackend,
-				KeyGroupsStateHandle keyedStateHandleStream) {
+				KeyedStateHandle keyedStateHandleBackend,
+				KeyedStateHandle keyedStateHandleStream) {
 
 			boolean hasAnyState = keyedStateHandleBackend != null
 					|| keyedStateHandleStream != null

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index eeee8dc..8f42c1a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
@@ -559,11 +559,11 @@ public class AbstractStreamOperatorTest {
 
 		final CloseableRegistry closeableRegistry = new CloseableRegistry();
 
-		RunnableFuture<KeyGroupsStateHandle> futureKeyGroupStateHandle = mock(RunnableFuture.class);
+		RunnableFuture<KeyedStateHandle> futureKeyedStateHandle = mock(RunnableFuture.class);
 		RunnableFuture<OperatorStateHandle> futureOperatorStateHandle = mock(RunnableFuture.class);
 
 		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
-		when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyGroupStateHandle);
+		when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
 		when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
 
 		OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult());
@@ -609,9 +609,9 @@ public class AbstractStreamOperatorTest {
 		verify(context).close();
 		verify(operatorSnapshotResult).cancel();
 
-		verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+		verify(futureKeyedStateHandle).cancel(anyBoolean());
 		verify(futureOperatorStateHandle).cancel(anyBoolean());
-		verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+		verify(futureKeyedStateHandle).cancel(anyBoolean());
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
index 490df52..f57eed1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -41,12 +41,12 @@ public class OperatorSnapshotResultTest extends TestLogger {
 
 		operatorSnapshotResult.cancel();
 
-		KeyGroupsStateHandle keyedManagedStateHandle = mock(KeyGroupsStateHandle.class);
-		RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
+		KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class);
+		RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
 		when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
 
-		KeyGroupsStateHandle keyedRawStateHandle = mock(KeyGroupsStateHandle.class);
-		RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture = mock(RunnableFuture.class);
+		KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class);
+		RunnableFuture<KeyedStateHandle> keyedStateRawFuture = mock(RunnableFuture.class);
 		when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
 
 		OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 963c42c..8e0edfc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
@@ -75,7 +76,7 @@ public class StateInitializationContextImplTest {
 
 		ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64);
 
-		List<KeyGroupsStateHandle> keyGroupsStateHandles = new ArrayList<>(NUM_HANDLES);
+		List<KeyedStateHandle> keyedStateHandles = new ArrayList<>(NUM_HANDLES);
 		int prev = 0;
 		for (int i = 0; i < NUM_HANDLES; ++i) {
 			out.reset();
@@ -91,10 +92,10 @@ public class StateInitializationContextImplTest {
 				++writtenKeyGroups;
 			}
 
-			KeyGroupsStateHandle handle =
+			KeyedStateHandle handle =
 					new KeyGroupsStateHandle(offsets, new ByteStateHandleCloseChecking("kg-" + i, out.toByteArray()));
 
-			keyGroupsStateHandles.add(handle);
+			keyedStateHandles.add(handle);
 		}
 
 		List<OperatorStateHandle> operatorStateHandles = new ArrayList<>(NUM_HANDLES);
@@ -125,7 +126,7 @@ public class StateInitializationContextImplTest {
 						true,
 						stateStore,
 						mock(KeyedStateStore.class),
-						keyGroupsStateHandles,
+						keyedStateHandles,
 						operatorStateHandles,
 						closableRegistry);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 58cfefd..4435247 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -186,8 +187,8 @@ public class InterruptSensitiveRestoreTest {
 
 
 		ChainedStateHandle<StreamStateHandle> operatorState = null;
-		List<KeyGroupsStateHandle> keyGroupStateFromBackend = Collections.emptyList();
-		List<KeyGroupsStateHandle> keyGroupStateFromStream = Collections.emptyList();
+		List<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList();
+		List<KeyedStateHandle> keyedStateFromStream = Collections.emptyList();
 		List<Collection<OperatorStateHandle>> operatorStateBackend = Collections.emptyList();
 		List<Collection<OperatorStateHandle>> operatorStateStream = Collections.emptyList();
 
@@ -201,8 +202,8 @@ public class InterruptSensitiveRestoreTest {
 		Collection<OperatorStateHandle> operatorStateHandles =
 				Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state));
 
-		List<KeyGroupsStateHandle> keyGroupsStateHandles =
-				Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state));
+		List<KeyedStateHandle> keyedStateHandles =
+				Collections.<KeyedStateHandle>singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state));
 
 		switch (mode) {
 			case OPERATOR_MANAGED:
@@ -212,10 +213,10 @@ public class InterruptSensitiveRestoreTest {
 				operatorStateStream = Collections.singletonList(operatorStateHandles);
 				break;
 			case KEYED_MANAGED:
-				keyGroupStateFromBackend = keyGroupsStateHandles;
+				keyedStateFromBackend = keyedStateHandles;
 				break;
 			case KEYED_RAW:
-				keyGroupStateFromStream = keyGroupsStateHandles;
+				keyedStateFromStream = keyedStateHandles;
 				break;
 			case LEGACY:
 				operatorState = new ChainedStateHandle<>(Collections.singletonList(state));
@@ -228,8 +229,8 @@ public class InterruptSensitiveRestoreTest {
 			operatorState,
 			operatorStateBackend,
 			operatorStateStream,
-			keyGroupStateFromBackend,
-			keyGroupStateFromStream);
+			keyedStateFromBackend,
+			keyedStateFromStream);
 
 		JobInformation jobInformation = new JobInformation(
 			new JobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d7e3d6c..f34522b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
@@ -458,8 +458,8 @@ public class StreamTaskTest extends TestLogger {
 
 		StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
-		KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class);
-		KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+		KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class);
+		KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class);
 		OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
 		OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
 
@@ -563,8 +563,8 @@ public class StreamTaskTest extends TestLogger {
 					(ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0],
 					(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1],
 					(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2],
-					(KeyGroupsStateHandle)invocation.getArguments()[3],
-					(KeyGroupsStateHandle)invocation.getArguments()[4]);
+					(KeyedStateHandle)invocation.getArguments()[3],
+					(KeyedStateHandle)invocation.getArguments()[4]);
 			}
 		});
 
@@ -574,8 +574,8 @@ public class StreamTaskTest extends TestLogger {
 
 		StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
-		KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class);
-		KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+		KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class);
+		KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class);
 		OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
 		OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 945103c..912d579 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -318,7 +319,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 		StreamStateHandle stateHandle = SavepointV0Serializer.convertOperatorAndFunctionState(state);
 
-		List<KeyGroupsStateHandle> keyGroupStatesList = new ArrayList<>();
+		List<KeyedStateHandle> keyGroupStatesList = new ArrayList<>();
 		if (state.getKvStates() != null) {
 			KeyGroupsStateHandle keyedStateHandle = SavepointV0Serializer.convertKeyedBackendState(
 					state.getKvStates(),
@@ -331,7 +332,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		initializeState(new OperatorStateHandles(0,
 				stateHandle,
 				keyGroupStatesList,
-				Collections.<KeyGroupsStateHandle>emptyList(),
+				Collections.<KeyedStateHandle>emptyList(),
 				Collections.<OperatorStateHandle>emptyList(),
 				Collections.<OperatorStateHandle>emptyList()));
 	}
@@ -364,16 +365,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 			KeyGroupRange localKeyGroupRange =
 					keyGroupPartitions.get(subtaskIndex);
 
-			List<KeyGroupsStateHandle> localManagedKeyGroupState = null;
+			List<KeyedStateHandle> localManagedKeyGroupState = null;
 			if (operatorStateHandles.getManagedKeyedState() != null) {
-				localManagedKeyGroupState = StateAssignmentOperation.getKeyGroupsStateHandles(
+				localManagedKeyGroupState = StateAssignmentOperation.getKeyedStateHandles(
 						operatorStateHandles.getManagedKeyedState(),
 						localKeyGroupRange);
 			}
 
-			List<KeyGroupsStateHandle> localRawKeyGroupState = null;
+			List<KeyedStateHandle> localRawKeyGroupState = null;
 			if (operatorStateHandles.getRawKeyedState() != null) {
-				localRawKeyGroupState = StateAssignmentOperation.getKeyGroupsStateHandles(
+				localRawKeyGroupState = StateAssignmentOperation.getKeyedStateHandles(
 						operatorStateHandles.getRawKeyedState(),
 						localKeyGroupRange);
 			}
@@ -442,15 +443,15 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		List<OperatorStateHandle> mergedManagedOperatorState = new ArrayList<>(handles.length);
 		List<OperatorStateHandle> mergedRawOperatorState = new ArrayList<>(handles.length);
 
-		List<KeyGroupsStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length);
-		List<KeyGroupsStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length);
+		List<KeyedStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length);
+		List<KeyedStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length);
 
 		for (OperatorStateHandles handle: handles) {
 
 			Collection<OperatorStateHandle> managedOperatorState = handle.getManagedOperatorState();
 			Collection<OperatorStateHandle> rawOperatorState = handle.getRawOperatorState();
-			Collection<KeyGroupsStateHandle> managedKeyedState = handle.getManagedKeyedState();
-			Collection<KeyGroupsStateHandle> rawKeyedState = handle.getRawKeyedState();
+			Collection<KeyedStateHandle> managedKeyedState = handle.getManagedKeyedState();
+			Collection<KeyedStateHandle> rawKeyedState = handle.getRawKeyedState();
 
 			if (managedOperatorState != null) {
 				mergedManagedOperatorState.addAll(managedOperatorState);
@@ -502,8 +503,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 			timestamp,
 			CheckpointOptions.forFullCheckpoint());
 
-		KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
-		KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
+		KeyedStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
+		KeyedStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
 
 		OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
 		OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index d45ae21..d9c7387 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -65,7 +66,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 
 	// when we restore we keep the state here so that we can call restore
 	// when the operator requests the keyed state backend
-	private List<KeyGroupsStateHandle> restoredKeyedState = null;
+	private List<KeyedStateHandle> restoredKeyedState = null;
 
 	public KeyedOneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
@@ -144,7 +145,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		}
 
 		if (keyedStateBackend != null) {
-			RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
+			RunnableFuture<KeyedStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
 					checkpointId,
 					timestamp,
 					streamFactory,
@@ -177,14 +178,14 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 			byte keyedStatePresent = (byte) inStream.read();
 			if (keyedStatePresent == 1) {
 				ObjectInputStream ois = new ObjectInputStream(inStream);
-				this.restoredKeyedState = Collections.singletonList((KeyGroupsStateHandle) ois.readObject());
+				this.restoredKeyedState = Collections.singletonList((KeyedStateHandle) ois.readObject());
 			}
 		}
 	}
 
 
-	private static boolean hasMigrationHandles(Collection<KeyGroupsStateHandle> allKeyGroupsHandles) {
-		for (KeyGroupsStateHandle handle : allKeyGroupsHandles) {
+	private static boolean hasMigrationHandles(Collection<KeyedStateHandle> allKeyGroupsHandles) {
+		for (KeyedStateHandle handle : allKeyGroupsHandles) {
 			if (handle instanceof Migration) {
 				return true;
 			}
@@ -225,17 +226,17 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 					keyGroupPartitions.get(subtaskIndex);
 
 			restoredKeyedState = null;
-			Collection<KeyGroupsStateHandle> managedKeyedState = operatorStateHandles.getManagedKeyedState();
+			Collection<KeyedStateHandle> managedKeyedState = operatorStateHandles.getManagedKeyedState();
 			if (managedKeyedState != null) {
 
 				// if we have migration handles, don't reshuffle state and preserve
 				// the migration tag
 				if (hasMigrationHandles(managedKeyedState)) {
-					List<KeyGroupsStateHandle> result = new ArrayList<>(managedKeyedState.size());
+					List<KeyedStateHandle> result = new ArrayList<>(managedKeyedState.size());
 					result.addAll(managedKeyedState);
 					restoredKeyedState = result;
 				} else {
-					restoredKeyedState = StateAssignmentOperation.getKeyGroupsStateHandles(
+					restoredKeyedState = StateAssignmentOperation.getKeyedStateHandles(
 							managedKeyedState,
 							localKeyGroupRange);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 8e76f70..41a083a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.mockito.invocation.InvocationOnMock;
@@ -50,7 +51,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
 
 	// when we restore we keep the state here so that we can call restore
 	// when the operator requests the keyed state backend
-	private Collection<KeyGroupsStateHandle> restoredKeyedState = null;
+	private Collection<KeyedStateHandle> restoredKeyedState = null;
 
 	public KeyedTwoInputStreamOperatorTestHarness(
 			TwoInputStreamOperator<IN1, IN2, OUT> operator,