You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:32 UTC

[14/27] flink git commit: [FLINK-4381] Refactor State to Prepare For Key-Group State Backends

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index f9698a8..1d07bdd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -38,7 +39,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -54,10 +59,12 @@ import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
@@ -82,12 +89,9 @@ public class InterruptSensitiveRestoreTest {
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 		cfg.setStreamOperator(new StreamSource<>(new TestSource()));
 
-		StateHandle<Serializable> lockingHandle = new InterruptLockingStateHandle();
-		StreamTaskState opState = new StreamTaskState();
-		opState.setFunctionState(lockingHandle);
-		StreamTaskStateList taskState = new StreamTaskStateList(new StreamTaskState[] { opState });
+		StreamStateHandle lockingHandle = new InterruptLockingStateHandle();
 
-		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, taskState);
+		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, lockingHandle);
 		Task task = createTask(tdd);
 
 		// start the task and wait until it is in "restore"
@@ -113,7 +117,10 @@ public class InterruptSensitiveRestoreTest {
 
 	private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
 			Configuration taskConfig,
-			StateHandle<?> state) throws IOException {
+			StreamStateHandle state) throws IOException {
+
+		ChainedStateHandle<StreamStateHandle> operatorState = new ChainedStateHandle<>(Collections.singletonList(state));
+		List<KeyGroupsStateHandle> keyGroupState = Collections.emptyList();
 
 		return new TaskDeploymentDescriptor(
 				new JobID(),
@@ -131,7 +138,8 @@ public class InterruptSensitiveRestoreTest {
 				Collections.<BlobKey>emptyList(),
 				Collections.<URL>emptyList(),
 				0,
-				new SerializedValue<StateHandle<?>>(state));
+				operatorState,
+				keyGroupState);
 	}
 	
 	private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException {
@@ -159,14 +167,34 @@ public class InterruptSensitiveRestoreTest {
 	// ------------------------------------------------------------------------
 
 	@SuppressWarnings("serial")
-	private static class InterruptLockingStateHandle implements StateHandle<Serializable> {
+	private static class InterruptLockingStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
 
-		private transient volatile boolean closed;
-		
 		@Override
-		public Serializable getState(ClassLoader userCodeClassLoader) {
+		public FSDataInputStream openInputStream() throws Exception {
+			ensureNotClosed();
+			FSDataInputStream is = new FSDataInputStream() {
+
+				@Override
+				public void seek(long desired) throws IOException {
+				}
+
+				@Override
+				public long getPos() throws IOException {
+					return 0;
+				}
+
+				@Override
+				public int read() throws IOException {
+					block();
+					throw new EOFException();
+				}
+			};
+			registerCloseable(is);
+			return is;
+		}
+
+		private void block() {
 			IN_RESTORE_LATCH.trigger();
-			
 			// this mimics what happens in the HDFS client code.
 			// an interrupt on a waiting object leads to an infinite loop
 			try {
@@ -175,7 +203,7 @@ public class InterruptSensitiveRestoreTest {
 				}
 			}
 			catch (InterruptedException e) {
-				while (!closed) {
+				while (!isClosed()) {
 					try {
 						synchronized (this) {
 							wait();
@@ -183,8 +211,6 @@ public class InterruptSensitiveRestoreTest {
 					} catch (InterruptedException ignored) {}
 				}
 			}
-			
-			return new SerializableObject();
 		}
 
 		@Override
@@ -194,11 +220,6 @@ public class InterruptSensitiveRestoreTest {
 		public long getStateSize() throws Exception {
 			return 0;
 		}
-
-		@Override
-		public void close() throws IOException {
-			closed = true;
-		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 05b8e8c..5e82569 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -49,7 +49,9 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
 import org.mockito.invocation.InvocationOnMock;
@@ -310,7 +312,10 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+	public void acknowledgeCheckpoint(long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			List<KeyGroupsStateHandle> keyGroupStateHandles) {
+
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index cfaeaad..66bc237 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -1,236 +1,234 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AsynchronousStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for asynchronous checkpoints.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-@SuppressWarnings("serial")
-public class StreamTaskAsyncCheckpointTest {
-
-	/**
-	 * This ensures that asynchronous state handles are actually materialized asynchonously.
-	 *
-	 * <p>We use latches to block at various stages and see if the code still continues through
-	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
-	 * test will simply lock forever.
-	 * @throws Exception
-	 */
-	@Test
-	public void testAsyncCheckpoints() throws Exception {
-		final OneShotLatch delayCheckpointLatch = new OneShotLatch();
-		final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-
-		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-		
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		
-		streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-
-		StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-			testHarness.jobConfig,
-			testHarness.taskConfig,
-			testHarness.memorySize,
-			new MockInputSplitProvider(),
-			testHarness.bufferSize) {
-
-			@Override
-			public ExecutionConfig getExecutionConfig() {
-				return testHarness.executionConfig;
-			}
-
-			@Override
-			public void acknowledgeCheckpoint(long checkpointId) {
-				super.acknowledgeCheckpoint(checkpointId);
-			}
-
-			@Override
-			public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
-				super.acknowledgeCheckpoint(checkpointId, state);
-
-				// block on the latch, to verify that triggerCheckpoint returns below,
-				// even though the async checkpoint would not finish
-				try {
-					delayCheckpointLatch.await();
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-
-				assertTrue(state instanceof StreamTaskStateList);
-				StreamTaskStateList stateList = (StreamTaskStateList) state;
-
-				// should be only one state
-				StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0];
-				StateHandle<?> operatorState = taskState.getOperatorState();
-				assertTrue("It must be a TestStateHandle", operatorState instanceof TestStateHandle);
-				TestStateHandle testState = (TestStateHandle) operatorState;
-				assertEquals(42, testState.checkpointId);
-				assertEquals(17, testState.timestamp);
-
-				// we now know that the checkpoint went through
-				ensureCheckpointLatch.trigger();
-			}
-		};
-
-		testHarness.invoke(mockEnv);
-
-		// wait for the task to be running
-		for (Field field: StreamTask.class.getDeclaredFields()) {
-			if (field.getName().equals("isRunning")) {
-				field.setAccessible(true);
-				while (!field.getBoolean(task)) {
-					Thread.sleep(10);
-				}
-
-			}
-		}
-
-		task.triggerCheckpoint(42, 17);
-
-		// now we allow the checkpoint
-		delayCheckpointLatch.trigger();
-
-		// wait for the checkpoint to go through
-		ensureCheckpointLatch.await();
-
-		testHarness.endInput();
-		testHarness.waitForTaskCompletion();
-	}
-
-
-	// ------------------------------------------------------------------------
-
-	public static class AsyncCheckpointOperator
-		extends AbstractStreamOperator<String>
-		implements OneInputStreamOperator<String, String> {
-		@Override
-		public void processElement(StreamRecord<String> element) throws Exception {
-			// we also don't care
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			// not interested
-		}
-
-
-		@Override
-		public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
-			StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
-			AsynchronousStateHandle<String> asyncState =
-				new DataInputViewAsynchronousStateHandle(checkpointId, timestamp);
-
-			taskState.setOperatorState(asyncState);
-
-			return taskState;
-		}
-
-		@Override
-		public void restoreState(StreamTaskState taskState) throws Exception {
-			super.restoreState(taskState);
-		}
-	}
-
-	private static class DataInputViewAsynchronousStateHandle extends AsynchronousStateHandle<String> {
-
-		private final long checkpointId;
-		private final long timestamp;
-
-		public DataInputViewAsynchronousStateHandle(long checkpointId, long timestamp) {
-			this.checkpointId = checkpointId;
-			this.timestamp = timestamp;
-		}
-
-		@Override
-		public StateHandle<String> materialize() throws Exception {
-			return new TestStateHandle(checkpointId, timestamp);
-		}
-
-		@Override
-		public long getStateSize() {
-			return 0;
-		}
-
-		@Override
-		public void close() throws IOException {}
-	}
-
-	private static class TestStateHandle implements StateHandle<String> {
-
-		public final long checkpointId;
-		public final long timestamp;
-
-		public TestStateHandle(long checkpointId, long timestamp) {
-			this.checkpointId = checkpointId;
-			this.timestamp = timestamp;
-		}
-
-		@Override
-		public String getState(ClassLoader userCodeClassLoader) throws Exception {
-			return null;
-		}
-
-		@Override
-		public void discardState() throws Exception {}
-
-		@Override
-		public long getStateSize() {
-			return 0;
-		}
-
-		@Override
-		public void close() throws IOException {}
-	}
-	
-	public static class DummyMapFunction<T> implements MapFunction<T, T> {
-		@Override
-		public T map(T value) { return value; }
-	}
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.runtime.tasks;
+//
+//import org.apache.flink.api.common.ExecutionConfig;
+//import org.apache.flink.api.common.functions.MapFunction;
+//import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+//import org.apache.flink.core.testutils.OneShotLatch;
+//import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+//import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+//import org.apache.flink.streaming.api.graph.StreamConfig;
+//import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+//import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+//import org.apache.flink.streaming.api.watermark.Watermark;
+//import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+//import org.junit.Test;
+//import org.junit.runner.RunWith;
+//import org.powermock.core.classloader.annotations.PowerMockIgnore;
+//import org.powermock.core.classloader.annotations.PrepareForTest;
+//import org.powermock.modules.junit4.PowerMockRunner;
+//
+//import java.io.IOException;
+//import java.lang.reflect.Field;
+//
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertTrue;
+//
+///**
+// * Tests for asynchronous checkpoints.
+// */
+//@RunWith(PowerMockRunner.class)
+//@PrepareForTest(ResultPartitionWriter.class)
+//@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+//@SuppressWarnings("serial")
+//public class StreamTaskAsyncCheckpointTest {
+//
+//	/**
+//	 * This ensures that asynchronous state handles are actually materialized asynchonously.
+//	 *
+//	 * <p>We use latches to block at various stages and see if the code still continues through
+//	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
+//	 * test will simply lock forever.
+//	 * @throws Exception
+//	 */
+//	@Test
+//	public void testAsyncCheckpoints() throws Exception {
+//		final OneShotLatch delayCheckpointLatch = new OneShotLatch();
+//		final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
+//
+//		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+//
+//		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+//
+//		StreamConfig streamConfig = testHarness.getStreamConfig();
+//
+//		streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+//
+//		StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+//			testHarness.jobConfig,
+//			testHarness.taskConfig,
+//			testHarness.memorySize,
+//			new MockInputSplitProvider(),
+//			testHarness.bufferSize) {
+//
+//			@Override
+//			public ExecutionConfig getExecutionConfig() {
+//				return testHarness.executionConfig;
+//			}
+//
+//			@Override
+//			public void acknowledgeCheckpoint(long checkpointId) {
+//				super.acknowledgeCheckpoint(checkpointId);
+//			}
+//
+//			@Override
+//			public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+//				super.acknowledgeCheckpoint(checkpointId, state);
+//
+//				// block on the latch, to verify that triggerCheckpoint returns below,
+//				// even though the async checkpoint would not finish
+//				try {
+//					delayCheckpointLatch.await();
+//				} catch (InterruptedException e) {
+//					e.printStackTrace();
+//				}
+//
+//				assertTrue(state instanceof StreamTaskStateList);
+//				StreamTaskStateList stateList = (StreamTaskStateList) state;
+//
+//				// should be only one state
+//				StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0];
+//				StateHandle<?> operatorState = taskState.getOperatorState();
+//				assertTrue("It must be a TestStateHandle", operatorState instanceof TestStateHandle);
+//				TestStateHandle testState = (TestStateHandle) operatorState;
+//				assertEquals(42, testState.checkpointId);
+//				assertEquals(17, testState.timestamp);
+//
+//				// we now know that the checkpoint went through
+//				ensureCheckpointLatch.trigger();
+//			}
+//		};
+//
+//		testHarness.invoke(mockEnv);
+//
+//		// wait for the task to be running
+//		for (Field field: StreamTask.class.getDeclaredFields()) {
+//			if (field.getName().equals("isRunning")) {
+//				field.setAccessible(true);
+//				while (!field.getBoolean(task)) {
+//					Thread.sleep(10);
+//				}
+//
+//			}
+//		}
+//
+//		task.triggerCheckpoint(42, 17);
+//
+//		// now we allow the checkpoint
+//		delayCheckpointLatch.trigger();
+//
+//		// wait for the checkpoint to go through
+//		ensureCheckpointLatch.await();
+//
+//		testHarness.endInput();
+//		testHarness.waitForTaskCompletion();
+//	}
+//
+//
+//	// ------------------------------------------------------------------------
+//
+//	public static class AsyncCheckpointOperator
+//		extends AbstractStreamOperator<String>
+//		implements OneInputStreamOperator<String, String> {
+//		@Override
+//		public void processElement(StreamRecord<String> element) throws Exception {
+//			// we also don't care
+//		}
+//
+//		@Override
+//		public void processWatermark(Watermark mark) throws Exception {
+//			// not interested
+//		}
+//
+//
+//		@Override
+//		public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
+//			StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+//
+//			AsynchronousStateHandle<String> asyncState =
+//				new DataInputViewAsynchronousStateHandle(checkpointId, timestamp);
+//
+//			taskState.setOperatorState(asyncState);
+//
+//			return taskState;
+//		}
+//
+//		@Override
+//		public void restoreState(StreamTaskState taskState) throws Exception {
+//			super.restoreState(taskState);
+//		}
+//	}
+//
+//	private static class DataInputViewAsynchronousStateHandle extends AsynchronousStateHandle<String> {
+//
+//		private final long checkpointId;
+//		private final long timestamp;
+//
+//		public DataInputViewAsynchronousStateHandle(long checkpointId, long timestamp) {
+//			this.checkpointId = checkpointId;
+//			this.timestamp = timestamp;
+//		}
+//
+//		@Override
+//		public StateHandle<String> materialize() throws Exception {
+//			return new TestStateHandle(checkpointId, timestamp);
+//		}
+//
+//		@Override
+//		public long getStateSize() {
+//			return 0;
+//		}
+//
+//		@Override
+//		public void close() throws IOException {}
+//	}
+//
+//	private static class TestStateHandle implements StateHandle<String> {
+//
+//		public final long checkpointId;
+//		public final long timestamp;
+//
+//		public TestStateHandle(long checkpointId, long timestamp) {
+//			this.checkpointId = checkpointId;
+//			this.timestamp = timestamp;
+//		}
+//
+//		@Override
+//		public String getState(ClassLoader userCodeClassLoader) throws Exception {
+//			return null;
+//		}
+//
+//		@Override
+//		public void discardState() throws Exception {}
+//
+//		@Override
+//		public long getStateSize() {
+//			return 0;
+//		}
+//
+//		@Override
+//		public void close() throws IOException {}
+//	}
+//
+//	public static class DummyMapFunction<T> implements MapFunction<T, T> {
+//		@Override
+//		public T map(T value) { return value; }
+//	}
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 675e7b6..bc255ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -22,24 +22,18 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -75,47 +69,41 @@ public class MockContext<IN, OUT> {
 		return output;
 	}
 
-	public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
+	public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) throws Exception {
 		return createAndExecuteForKeyedStream(operator, inputs, null, null);
 	}
 	
 	public static <IN, OUT, KEY> List<OUT> createAndExecuteForKeyedStream(
 				OneInputStreamOperator<IN, OUT> operator, List<IN> inputs,
-				KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) {
+				KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) throws Exception {
+
+		OneInputStreamOperatorTestHarness<IN, OUT> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(keySelector, keyType);
+
+		testHarness.setup();
+		testHarness.open();
 		
-		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
+		operator.open();
 
-		StreamConfig config = new StreamConfig(new Configuration());
-		if (keySelector != null && keyType != null) {
-			config.setStateKeySerializer(keyType.createSerializer(new ExecutionConfig()));
-			config.setStatePartitioner(0, keySelector);
+		for (IN in: inputs) {
+			testHarness.processElement(new StreamRecord<>(in));
 		}
-		
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		final Object lock = new Object();
-		final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-		operator.setup(mockTask, config, mockContext.output);
-		try {
-			operator.open();
-
-			StreamRecord<IN> record = new StreamRecord<IN>(null);
-			for (IN in: inputs) {
-				record = record.replace(in);
-				synchronized (lock) {
-					operator.setKeyContextElement1(record);
-					operator.processElement(record);
-				}
-			}
 
-			operator.close();
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke operator.", e);
-		} finally {
-			timerService.shutdownNow();
+		testHarness.close();
+
+		ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+
+		List<OUT> result = new ArrayList<>();
+
+		for (Object o : output) {
+			if (o instanceof StreamRecord) {
+				result.add((OUT) ((StreamRecord) o).getValue());
+			}
 		}
 
-		return mockContext.getOutputs();
+		return result;
 	}
 
 	private static StreamTask<?, ?> createMockTaskWithTimer(
@@ -149,22 +137,6 @@ public class MockContext<IN, OUT> {
 			}
 		}).when(task).registerTimer(anyLong(), any(Triggerable.class));
 
-
-		try {
-			doAnswer(new Answer<AbstractStateBackend>() {
-				@Override
-				public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final String operatorIdentifier = (String) invocationOnMock.getArguments()[0];
-					final TypeSerializer<?> keySerializer = (TypeSerializer<?>) invocationOnMock.getArguments()[1];
-					MemoryStateBackend backend = MemoryStateBackend.create();
-					backend.initializeForJob(new DummyEnvironment("dummty", 1, 0), operatorIdentifier, keySerializer);
-					return backend;
-				}
-			}).when(task).createStateBackend(any(String.class), any(TypeSerializer.class));
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
 		return task;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6855989..6cb46d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -27,28 +27,23 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
-import org.apache.flink.runtime.state.AsynchronousStateHandle;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.Serializable;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 
@@ -73,9 +68,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	final ConcurrentLinkedQueue<Object> outputList;
 
 	final StreamConfig config;
-	
+
 	final ExecutionConfig executionConfig;
-	
+
 	final Object checkpointLock;
 
 	final TimeServiceProvider timeServiceProvider;
@@ -89,8 +84,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	 * Whether setup() was called on the operator. This is reset when calling close().
 	 */
 	private boolean setupCalled = false;
-	
-	
+
+
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
 		this(operator, new ExecutionConfig());
 	}
@@ -107,17 +102,20 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 			TimeServiceProvider testTimeProvider) {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<Object>();
-		this.config = new StreamConfig(new Configuration());
+		Configuration underlyingConfig = new Configuration();
+		this.config = new StreamConfig(underlyingConfig);
+		this.config.setCheckpointingEnabled(true);
 		this.executionConfig = executionConfig;
 		this.checkpointLock = new Object();
 
-		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
+		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig);
 		mockTask = mock(StreamTask.class);
 		timeServiceProvider = testTimeProvider;
 
 		when(mockTask.getName()).thenReturn("Mock Task");
 		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
 		when(mockTask.getConfiguration()).thenReturn(config);
+		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
 		when(mockTask.getEnvironment()).thenReturn(env);
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
@@ -173,8 +171,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		ClosureCleaner.clean(keySelector, false);
 		config.setStatePartitioner(0, keySelector);
 		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+		config.setKeyGroupAssigner(new HashKeyGroupAssigner<K>(10));
 	}
-	
+
 	/**
 	 * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
 	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
@@ -206,47 +205,30 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotOperatorState(long, long)}
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(org.apache.flink.core.fs.FSDataOutputStream, long, long)} ()}
 	 */
-	public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState snapshot = operator.snapshotOperatorState(checkpointId, timestamp);
-		// materialize asynchronous state handles
-		if (snapshot != null) {
-			if (snapshot.getFunctionState() instanceof AsynchronousStateHandle) {
-				AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) snapshot.getFunctionState();
-				snapshot.setFunctionState(asyncState.materialize());
-			}
-			if (snapshot.getOperatorState() instanceof AsynchronousStateHandle) {
-				AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) snapshot.getOperatorState();
-				snapshot.setOperatorState(asyncState.materialize());
-			}
-			if (snapshot.getKvStates() != null) {
-				Set<String> keys = snapshot.getKvStates().keySet();
-				HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = snapshot.getKvStates();
-				for (String key: keys) {
-					if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
-						AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
-						kvStates.put(key, asyncHandle.materialize());
-					}
-				}
-			}
-
-		}
-		return snapshot;
+	public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
+		// simply use an in-memory handle
+		MemoryStateBackend backend = new MemoryStateBackend();
+		AbstractStateBackend.CheckpointStateOutputStream outStream =
+				backend.createCheckpointStateOutputStream(checkpointId, timestamp);
+		operator.snapshotState(outStream, checkpointId, timestamp);
+		return outStream.closeAndGetHandle();
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)}
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}
 	 */
 	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
 		operator.notifyOfCompletedCheckpoint(checkpointId);
 	}
 
+
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState)}
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()}
 	 */
-	public void restore(StreamTaskState snapshot, long recoveryTimestamp) throws Exception {
-		operator.restoreState(snapshot);
+	public void restore(StreamStateHandle snapshot) throws Exception {
+		operator.restoreState(snapshot.openInputStream());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index faf38a3..779436a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -30,7 +33,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -167,20 +169,20 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 	/**
 	 * Takes a snapshot of the current state of the operator. This can be used to test fault-tolerance.
 	 */
-	public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
+	public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
 		return testHarness.snapshot(checkpointId, timestamp);
 	}
 
 	/**
-	 * Resumes execution from a provided {@link StreamTaskState}. This is used to test recovery after a failure.
+	 * Resumes execution from a provided {@link StreamStateHandle}. This is used to test recovery after a failure.
 	 */
-	public void restore(StreamTaskState snapshot, long recoveryTime) throws Exception {
+	public void restore(StreamStateHandle stateHandle) throws Exception {
 		Preconditions.checkArgument(!isOpen,
 			"You are trying to restore() while the operator is still open. " +
 				"Please call close() first.");
 
 		testHarness.setup();
-		testHarness.restore(snapshot, recoveryTime);
+		testHarness.restore(stateHandle);
 		openOperator();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 199a6af..97c8339 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -73,6 +73,8 @@ import static org.junit.Assert.*;
 @RunWith(Parameterized.class)
 public class EventTimeWindowCheckpointingITCase extends TestLogger {
 
+
+	private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
 	private static final int PARALLELISM = 4;
 
 	private static ForkableFlinkMiniCluster cluster;
@@ -109,7 +111,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 	public void initStateBackend() throws IOException {
 		switch (stateBackendEnum) {
 			case MEM:
-				this.stateBackend = new MemoryStateBackend();
+				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
 				break;
 			case FILE: {
 				String backups = tempFolder.newFolder().getAbsolutePath();
@@ -119,7 +121,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 			case ROCKSDB: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
 				String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-				RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+				RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
 				rdb.setDbStoragePath(rocksDb);
 				this.stateBackend = rdb;
 				break;
@@ -127,7 +129,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 			case ROCKSDB_FULLY_ASYNC: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
 				String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-				RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+				RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
 				rdb.setDbStoragePath(rocksDb);
 				rdb.enableFullyAsyncSnapshots();
 				this.stateBackend = rdb;

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 0de2a75..8d1baeb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -145,7 +146,7 @@ public class RescalingITCase extends TestLogger {
 			for (int key = 0; key < numberKeys; key++) {
 				int keyGroupIndex = keyGroupAssigner.getKeyGroupIndex(key);
 
-				expectedResult.add(Tuple2.of(keyGroupIndex % parallelism, numberElements * key));
+				expectedResult.add(Tuple2.of(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupIndex), numberElements * key));
 			}
 
 			assertEquals(expectedResult, actualResult);
@@ -188,7 +189,7 @@ public class RescalingITCase extends TestLogger {
 
 			for (int key = 0; key < numberKeys; key++) {
 				int keyGroupIndex = keyGroupAssigner2.getKeyGroupIndex(key);
-				expectedResult2.add(Tuple2.of(keyGroupIndex % parallelism2, key * (numberElements + numberElements2)));
+				expectedResult2.add(Tuple2.of(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism2, keyGroupIndex), key * (numberElements + numberElements2)));
 			}
 
 			assertEquals(expectedResult2, actualResult2);
@@ -351,7 +352,8 @@ public class RescalingITCase extends TestLogger {
 			for (int key = 0; key < numberKeys; key++) {
 				int keyGroupIndex = keyGroupAssigner.getKeyGroupIndex(key);
 
-				expectedResult.add(Tuple2.of(keyGroupIndex % parallelism, numberElements * key));
+//				expectedResult.add(Tuple2.of(keyGroupIndex % parallelism, numberElements * key));
+				expectedResult.add(Tuple2.of(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupIndex) , numberElements * key));
 			}
 
 			assertEquals(expectedResult, actualResult);
@@ -401,7 +403,7 @@ public class RescalingITCase extends TestLogger {
 
 			for (int key = 0; key < numberKeys; key++) {
 				int keyGroupIndex = keyGroupAssigner2.getKeyGroupIndex(key);
-				expectedResult2.add(Tuple2.of(keyGroupIndex % parallelism2, key * (numberElements + numberElements2)));
+				expectedResult2.add(Tuple2.of(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism2, keyGroupIndex), key * (numberElements + numberElements2)));
 			}
 
 			assertEquals(expectedResult2, actualResult2);

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 4fc310c..550ba75 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -30,11 +30,10 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -46,8 +45,10 @@ import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
+import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
@@ -61,8 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
@@ -71,7 +70,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -219,7 +217,7 @@ public class SavepointITCase extends TestLogger {
 					new RequestSavepoint(savepointPath),
 					deadline.timeLeft());
 
-			SavepointV0 savepoint = (SavepointV0) ((ResponseSavepoint) Await.result(
+			SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(
 					savepointFuture, deadline.timeLeft())).savepoint();
 			LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
@@ -334,7 +332,7 @@ public class SavepointITCase extends TestLogger {
 
 					assertNotNull(subtaskState);
 					errMsg = "Initial operator state mismatch.";
-					assertEquals(errMsg, subtaskState.getState(), tdd.getOperatorState());
+					assertEquals(errMsg, subtaskState.getChainedStateHandle(), tdd.getOperatorState());
 				}
 			}
 
@@ -345,7 +343,7 @@ public class SavepointITCase extends TestLogger {
 
 			LOG.info("Disposing savepoint " + savepointPath + ".");
 			Future<Object> disposeFuture = jobManager.ask(
-					new DisposeSavepoint(savepointPath, Option.<List<BlobKey>>empty()),
+					new DisposeSavepoint(savepointPath),
 					deadline.timeLeft());
 
 			errMsg = "Failed to dispose savepoint " + savepointPath + ".";
@@ -360,14 +358,13 @@ public class SavepointITCase extends TestLogger {
 
 			for (TaskState stateForTaskGroup : savepoint.getTaskStates()) {
 				for (SubtaskState subtaskState : stateForTaskGroup.getStates()) {
-					StreamTaskStateList taskStateList = (StreamTaskStateList) subtaskState.getState()
-						.deserializeValue(ClassLoader.getSystemClassLoader());
+					ChainedStateHandle<StreamStateHandle> streamTaskState = subtaskState.getChainedStateHandle();
 
-					for (StreamTaskState taskState : taskStateList.getState(
-						ClassLoader.getSystemClassLoader())) {
-
-						AbstractFileStateHandle fsState = (AbstractFileStateHandle) taskState.getFunctionState();
-						checkpointFiles.add(new File(fsState.getFilePath().toUri()));
+					for (int i = 0; i < streamTaskState.getLength(); i++) {
+						if (streamTaskState.get(i) != null) {
+							FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState.get(i);
+							checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
+						}
 					}
 				}
 			}
@@ -499,21 +496,20 @@ public class SavepointITCase extends TestLogger {
 					new RequestSavepoint(savepointPath),
 					deadline.timeLeft());
 
-			SavepointV0 savepoint = (SavepointV0) ((ResponseSavepoint) Await.result(
+			SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(
 					savepointFuture, deadline.timeLeft())).savepoint();
 			LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
 			// Check that all checkpoint files have been removed
 			for (TaskState stateForTaskGroup : savepoint.getTaskStates()) {
 				for (SubtaskState subtaskState : stateForTaskGroup.getStates()) {
-					StreamTaskStateList taskStateList = (StreamTaskStateList) subtaskState.getState()
-							.deserializeValue(ClassLoader.getSystemClassLoader());
+					ChainedStateHandle<StreamStateHandle> streamTaskState = subtaskState.getChainedStateHandle();
 
-					for (StreamTaskState taskState : taskStateList.getState(
-							ClassLoader.getSystemClassLoader())) {
-
-						AbstractFileStateHandle fsState = (AbstractFileStateHandle) taskState.getFunctionState();
-						checkpointFiles.add(new File(fsState.getFilePath().toUri()));
+					for (int i = 0; i < streamTaskState.getLength(); i++) {
+						if (streamTaskState.get(i) != null) {
+							FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState.get(i);
+							checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
+						}
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index cdc7a80..3bc3cf5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -303,7 +303,7 @@ public class ClassLoaderITCase extends TestLogger {
 
 		// Dispose savepoint
 		LOG.info("Disposing savepoint at " + savepointPath);
-		Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath, Option.apply(blobKeys)), deadline.timeLeft());
+		Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
 		Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
 
 		if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
index c7d9a42..b3e2137 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.test.state;
 
 import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateHandle;
 
+import org.apache.flink.runtime.state.StateObject;
 import org.junit.Test;
 
 import org.reflections.Reflections;
@@ -34,7 +34,7 @@ import static org.junit.Assert.*;
 public class StateHandleSerializationTest {
 
 	/**
-	 * This test validates that all subclasses of {@link StateHandle} have a proper
+	 * This test validates that all subclasses of {@link StateObject} have a proper
 	 * serial version UID.
 	 */
 	@Test
@@ -46,7 +46,7 @@ public class StateHandleSerializationTest {
 
 			@SuppressWarnings("unchecked")
 			Set<Class<?>> stateHandleImplementations = (Set<Class<?>>) (Set<?>)
-					reflections.getSubTypesOf(StateHandle.class);
+					reflections.getSubTypesOf(StateObject.class);
 
 			for (Class<?> clazz : stateHandleImplementations) {
 				validataSerialVersionUID(clazz);
@@ -73,7 +73,7 @@ public class StateHandleSerializationTest {
 	private static void validataSerialVersionUID(Class<?> clazz) {
 		// all non-interface types must have a serial version UID
 		if (!clazz.isInterface()) {
-			assertFalse("Anonymous state handle classes have problematic serialization behavior",
+			assertFalse("Anonymous state handle classes have problematic serialization behavior: " + clazz,
 					clazz.isAnonymousClass());
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 6288946..41455cf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -34,13 +34,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.junit.Test;
 
-import java.io.Serializable;
-
 import static org.junit.Assert.fail;
 
 public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
@@ -132,13 +129,6 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 			long timestamp) throws Exception {
 			return null;
 		}
-
-		@Override
-		public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state,
-			long checkpointID,
-			long timestamp) throws Exception {
-			return null;
-		}
 	}
 
 	static final class SuccessException extends Exception {