You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 12:47:58 UTC

[08/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

[FLINK-4379] [checkpoints] Introduce rescalable operator state

This introduces the Operator State Backend, which stores state that is not partitioned
by a key. It replaces the 'Checkpointed' interface.

Additionally, this introduces CheckpointStateHandles as container for all checkpoint related state handles

This closes #2512


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53ed6ada
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53ed6ada
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53ed6ada

Branch: refs/heads/master
Commit: 53ed6adac8cbe6b5dcb692dc9b94970f3ec5887c
Parents: 2afc092
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Aug 31 23:59:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 30 12:38:46 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |   6 +-
 .../state/RocksDBKeyedStateBackend.java         |  75 ++--
 .../streaming/state/RocksDBStateBackend.java    |   8 +-
 .../state/RocksDBAsyncSnapshotTest.java         |  12 +-
 .../state/RocksDBStateBackendConfigTest.java    |  48 ++-
 .../api/common/functions/RuntimeContext.java    | 125 +-----
 .../util/AbstractRuntimeUDFContext.java         |  28 +-
 .../flink/api/common/state/OperatorState.java   |  70 ---
 .../flink/api/common/state/ValueState.java      |   2 +-
 .../java/typeutils/runtime/JavaSerializer.java  | 116 +++++
 .../flink/hdfstests/FileStateBackendTest.java   |  26 +-
 .../AbstractCEPBasePatternOperator.java         |   3 +-
 .../operator/AbstractCEPPatternOperator.java    |   2 -
 .../AbstractKeyedCEPPatternOperator.java        |   2 -
 .../checkpoint/CheckpointCoordinator.java       | 127 +++++-
 .../runtime/checkpoint/CompletedCheckpoint.java |   5 -
 .../checkpoint/OperatorStateRepartitioner.java  |  42 ++
 .../runtime/checkpoint/PendingCheckpoint.java   |  95 +++--
 .../RoundRobinOperatorStateRepartitioner.java   | 190 +++++++++
 .../flink/runtime/checkpoint/SubtaskState.java  |   9 -
 .../flink/runtime/checkpoint/TaskState.java     |  79 +++-
 .../savepoint/SavepointV1Serializer.java        |  97 ++++-
 .../deployment/TaskDeploymentDescriptor.java    |  50 ++-
 .../flink/runtime/execution/Environment.java    |  16 +-
 .../flink/runtime/executiongraph/Execution.java |  25 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   2 -
 .../runtime/executiongraph/ExecutionVertex.java |   6 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |  11 +-
 .../checkpoint/AcknowledgeCheckpoint.java       |  67 ++-
 .../runtime/state/AbstractCloseableHandle.java  | 126 ------
 .../state/AbstractKeyedStateBackend.java        | 342 +++++++++++++++
 .../runtime/state/AbstractStateBackend.java     |  43 +-
 .../flink/runtime/state/ChainedStateHandle.java |   7 +-
 .../runtime/state/CheckpointStateHandles.java   | 103 +++++
 .../flink/runtime/state/ClosableRegistry.java   |  84 ++++
 .../state/DefaultOperatorStateBackend.java      | 215 ++++++++++
 .../runtime/state/KeyGroupRangeOffsets.java     |   2 +
 .../runtime/state/KeyGroupsStateHandle.java     |   6 -
 .../flink/runtime/state/KeyedStateBackend.java  | 301 ++-----------
 .../runtime/state/OperatorStateBackend.java     |  35 ++
 .../runtime/state/OperatorStateHandle.java      | 109 +++++
 .../flink/runtime/state/OperatorStateStore.java |  47 +++
 ...artitionableCheckpointStateOutputStream.java |  96 +++++
 .../state/RetrievableStreamStateHandle.java     |   2 +-
 .../flink/runtime/state/SnapshotProvider.java   |  45 ++
 .../apache/flink/runtime/state/StateObject.java |   6 +-
 .../apache/flink/runtime/state/StateUtil.java   |  37 --
 .../state/filesystem/FileStateHandle.java       |   8 +-
 .../state/filesystem/FsStateBackend.java        |   6 +-
 .../state/heap/HeapKeyedStateBackend.java       | 210 ++++-----
 .../state/memory/ByteStreamStateHandle.java     |  13 +-
 .../state/memory/MemoryStateBackend.java        |   9 +-
 .../ActorGatewayCheckpointResponder.java        |  11 +-
 .../taskmanager/CheckpointResponder.java        |  15 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  12 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  11 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 421 +++++++++++++++----
 .../checkpoint/CheckpointStateRestoreTest.java  |  46 +-
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/PendingCheckpointTest.java       |   2 +-
 .../checkpoint/PendingSavepointTest.java        |   2 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |   5 -
 .../checkpoint/savepoint/SavepointV1Test.java   |  20 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  20 +-
 .../messages/CheckpointMessagesTest.java        |  17 +-
 .../operators/testutils/DummyEnvironment.java   |   3 +-
 .../operators/testutils/MockEnvironment.java    |   3 +-
 .../runtime/query/QueryableStateClientTest.java |   4 +-
 .../runtime/query/netty/KvStateClientTest.java  |   5 +-
 .../query/netty/KvStateServerHandlerTest.java   |   7 +-
 .../runtime/query/netty/KvStateServerTest.java  |   4 +-
 .../state/AbstractCloseableHandleTest.java      |  97 -----
 .../runtime/state/FileStateBackendTest.java     |  35 +-
 .../runtime/state/MemoryStateBackendTest.java   |  15 +-
 .../runtime/state/OperatorStateBackendTest.java | 155 +++++++
 .../runtime/state/StateBackendTestBase.java     | 115 ++---
 .../FsCheckpointStateOutputStreamTest.java      |  16 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   5 +-
 .../ZooKeeperStateHandleStoreITCase.java        |   4 -
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  37 +-
 .../connectors/kafka/KafkaConsumer08Test.java   |   4 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  43 +-
 .../kafka/FlinkKafkaConsumerBase.java           | 182 +++++---
 .../kafka/FlinkKafkaProducerBase.java           |  27 +-
 .../kafka/internals/AbstractFetcher.java        |   4 +-
 .../kafka/AtLeastOnceProducerTest.java          |  13 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 149 ++++++-
 .../connectors/kafka/KafkaConsumerTestBase.java |  30 +-
 .../kafka/testutils/MockRuntimeContext.java     |  10 -
 .../streaming/api/checkpoint/Checkpointed.java  |   1 +
 .../api/checkpoint/CheckpointedFunction.java    |  65 +++
 .../api/checkpoint/ListCheckpointed.java        |  65 +++
 .../source/ContinuousFileReaderOperator.java    |   5 +-
 .../api/operators/AbstractStreamOperator.java   |  96 ++++-
 .../operators/AbstractUdfStreamOperator.java    |  65 ++-
 .../operators/StreamCheckpointedOperator.java   |  58 +++
 .../streaming/api/operators/StreamOperator.java |  43 +-
 .../api/operators/StreamingRuntimeContext.java  |  32 --
 .../operators/GenericWriteAheadSink.java        |  25 +-
 .../windowing/EvictingWindowOperator.java       |   4 +-
 .../operators/windowing/WindowOperator.java     |  14 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  50 ++-
 .../streaming/runtime/tasks/StreamTask.java     | 314 ++++++++------
 .../operators/StreamingRuntimeContextTest.java  |   8 +-
 .../streaming/runtime/io/BarrierBufferTest.java |   8 +-
 .../runtime/io/BarrierTrackerTest.java          |  13 +-
 .../operators/StreamOperatorChainingTest.java   |  15 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  68 +--
 .../runtime/tasks/OneInputStreamTaskTest.java   |  82 +++-
 .../runtime/tasks/StreamMockEnvironment.java    |   3 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |  37 +-
 .../util/OneInputStreamOperatorTestHarness.java |  24 +-
 .../UdfStreamOperatorCheckpointingITCase.java   |  16 +-
 .../streaming/runtime/StateBackendITCase.java   |  11 +-
 115 files changed, 3981 insertions(+), 1890 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index e878ad5..9da33ef 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -156,7 +156,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 
 	private void writeKey(K key) throws IOException {
 		//write key
-		int beforeWrite = (int) keySerializationStream.getPosition();
+		int beforeWrite = keySerializationStream.getPosition();
 		backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);
 
 		if (ambiguousKeyPossible) {
@@ -166,7 +166,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	}
 
 	private void writeNameSpace(N namespace) throws IOException {
-		int beforeWrite = (int) keySerializationStream.getPosition();
+		int beforeWrite = keySerializationStream.getPosition();
 		namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);
 
 		if (ambiguousKeyPossible) {
@@ -176,7 +176,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	}
 
 	private void writeLengthFrom(int fromPosition) throws IOException {
-		int length = (int) (keySerializationStream.getPosition() - fromPosition);
+		int length = keySerializationStream.getPosition() - fromPosition;
 		writeVariableIntBytes(length);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d5a96af..126ebd2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -39,12 +39,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.InstantiationUtil;
@@ -73,12 +73,12 @@ import java.util.PriorityQueue;
 import java.util.concurrent.RunnableFuture;
 
 /**
- * A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
+ * A {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
  * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
  * checkpointing. This state backend can store very large state that exceeds memory and spills
- * to disk.
+ * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.
  */
-public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
+public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
 
@@ -98,9 +98,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 	private final File instanceRocksDBPath;
 
 	/**
-	 * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
-	 * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
-	 * iterating over a disposed db.
+	 * Lock for protecting cleanup of the RocksDB against the checkpointing thread. We acquire this when doing
+	 * asynchronous checkpoints and when disposing the DB. Otherwise, the asynchronous snapshot might try
+	 * iterating over a disposed DB. After aquriring the lock, always first check if (db == null).
 	 */
 	private final SerializableObject dbDisposeLock = new SerializableObject();
 
@@ -110,13 +110,13 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 	 * instance. They all write to this instance but to their own column family.
 	 */
 	@GuardedBy("dbDisposeLock")
-	protected volatile RocksDB db;
+	protected RocksDB db;
 
 	/**
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
-	private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;
+	private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> kvStateInformation;
 
 	/** Number of bytes required to prefix the key groups. */
 	private final int keyGroupPrefixBytes;
@@ -187,8 +187,8 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoreState
 	) throws Exception {
-		this(
-			jobId,
+
+		this(jobId,
 			operatorIdentifier,
 			userCodeClassLoader,
 			instanceBasePath,
@@ -210,15 +210,11 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 	}
 
 	/**
-	 * @see java.io.Closeable
-	 *
-	 * Should only be called by one thread.
-	 *
-	 * @throws Exception
+	 * Should only be called by one thread, and only after all accesses to the DB happened.
 	 */
 	@Override
-	public void close() throws Exception {
-		super.close();
+	public void dispose() {
+		super.dispose();
 
 		final RocksDB cleanupRockDBReference;
 
@@ -233,13 +229,17 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 
 		// Dispose decoupled db
 		if (cleanupRockDBReference != null) {
-			for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+			for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
 				column.f0.dispose();
 			}
 			cleanupRockDBReference.dispose();
 		}
 
-		FileUtils.deleteDirectory(instanceBasePath);
+		try {
+			FileUtils.deleteDirectory(instanceBasePath);
+		} catch (IOException ioex) {
+			LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath);
+		}
 	}
 
 	public int getKeyGroupPrefixBytes() {
@@ -248,7 +248,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 
 	/**
 	 * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
-	 * is also stopped when the backend is closed through {@link #close()}. For each backend, this method must always
+	 * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
 	 * be called by the same thread.
 	 *
 	 * @param checkpointId The Id of the checkpoint.
@@ -386,13 +386,13 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 			Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
 			outStream = checkpointStreamFactory.
 					createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+			stateBackend.cancelStreamRegistry.registerClosable(outStream);
 			outputView = new DataOutputViewStreamWrapper(outStream);
 		}
 
 		/**
 		 * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1).
 		 *
-		 * @return
 		 * @throws IOException
 		 */
 		public void writeDBSnapshot() throws IOException, InterruptedException {
@@ -408,7 +408,8 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 		 * @throws IOException
 		 */
 		public void closeCheckpointStream() throws IOException {
-			if(outStream != null) {
+			if (outStream != null) {
+				stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
 				snapshotResultStateHandle = closeSnapshotStreamAndGetHandle();
 			}
 		}
@@ -451,7 +452,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 
 			int kvStateId = 0;
 			//iterate all column families, where each column family holds one k/v state, to write the metadata
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : stateBackend.kvStateInformation.entrySet()) {
+			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> column : stateBackend.kvStateInformation.entrySet()) {
 
 				//be cooperative and check for interruption from time to time in the hot loop
 				checkInterrupted();
@@ -463,7 +464,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 				ReadOptions readOptions = new ReadOptions();
 				readOptions.setSnapshot(snapshot);
 				RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
-				kvStateIterators.add(new Tuple2<RocksIterator, Integer>(iterator, kvStateId));
+				kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
 				++kvStateId;
 			}
 		}
@@ -624,15 +625,16 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 				throws IOException, RocksDBException, ClassNotFoundException {
 			try {
 				currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+				rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
 				currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
 				restoreKVStateMetaData();
 				restoreKVStateData();
 			} finally {
-				if(currentStateHandleInStream != null) {
+				if (currentStateHandleInStream != null) {
+					rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
 					currentStateHandleInStream.close();
 				}
 			}
-
 		}
 
 		/**
@@ -652,19 +654,20 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 			//restore the empty columns for the k/v states through the metadata
 			for (int i = 0; i < numColumns; i++) {
 
-				StateDescriptor stateDescriptor = InstantiationUtil.deserializeObject(
+				StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) InstantiationUtil.deserializeObject(
 						currentStateHandleInStream,
 						rocksDBKeyedStateBackend.userCodeClassLoader);
 
-				Tuple2<ColumnFamilyHandle, StateDescriptor> columnFamily = rocksDBKeyedStateBackend.
+				Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> columnFamily = rocksDBKeyedStateBackend.
 						kvStateInformation.get(stateDescriptor.getName());
 
-				if(null == columnFamily) {
+				if (null == columnFamily) {
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
 							stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
 
-					columnFamily = new Tuple2<>(rocksDBKeyedStateBackend.db.
-							createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+					columnFamily = new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(
+							rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+
 					rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily);
 				}
 
@@ -727,9 +730,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
 	 * that we checkpointed, i.e. is already in the map of column families.
 	 */
-	protected ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {
+	protected ColumnFamilyHandle getColumnFamily(StateDescriptor<?, ?> descriptor) {
 
-		Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName());
+		Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName());
 
 		if (stateInfo != null) {
 			if (!stateInfo.f1.equals(descriptor)) {
@@ -744,7 +747,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 
 		try {
 			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
-			kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor));
+			Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> tuple =
+					new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(columnFamily, descriptor);
+			kvStateInformation.put(descriptor.getName(), tuple);
 			return columnFamily;
 		} catch (RocksDBException e) {
 			throw new RuntimeException("Error creating ColumnFamilyHandle.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index b6ce224..a0c980b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
@@ -224,7 +224,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public <K> KeyedStateBackend<K> createKeyedStateBackend(
+	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,
 			String operatorIdentifier,
@@ -251,7 +251,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID,
+	public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
+			Environment env,
+			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index c0c9ca1..bccbabc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -29,10 +29,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
@@ -66,7 +64,6 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutorService;
@@ -138,12 +135,11 @@ public class RocksDBAsyncSnapshotTest {
 			@Override
 			public void acknowledgeCheckpoint(
 					long checkpointId,
-					ChainedStateHandle<StreamStateHandle> chainedStateHandle, 
-					List<KeyGroupsStateHandle> keyGroupStateHandles,
+					CheckpointStateHandles checkpointStateHandles,
 					long synchronousDurationMillis, long asynchronousDurationMillis,
 					long bytesBufferedInAlignment, long alignmentDurationNanos) {
 
-				super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles,
+				super.acknowledgeCheckpoint(checkpointId, checkpointStateHandles,
 						synchronousDurationMillis, asynchronousDurationMillis,
 						bytesBufferedInAlignment, alignmentDurationNanos);
 
@@ -156,7 +152,7 @@ public class RocksDBAsyncSnapshotTest {
 				}
 
 				// should be only one k/v state
-				assertEquals(1, keyGroupStateHandles.size());
+				assertEquals(1, checkpointStateHandles.getKeyGroupsStateHandle().size());
 
 				// we now know that the checkpoint went through
 				ensureCheckpointLatch.trigger();

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 3b851be..07fc27c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
-
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.OperatingSystem;
@@ -34,7 +33,6 @@ import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionStyle;
@@ -45,8 +43,18 @@ import java.io.File;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.startsWith;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -88,13 +96,15 @@ public class RocksDBStateBackendConfigTest {
 		assertArrayEquals(new String[] { testDir1.getAbsolutePath(), testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths());
 
 		Environment env = getMockEnvironment(new File[] {});
-		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(env,
-				env.getJobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				1,
-				new KeyGroupRange(0, 0),
-				env.getTaskKvStateRegistry());
+		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+				createKeyedStateBackend(
+						env,
+						env.getJobID(),
+						"test_op",
+						IntSerializer.INSTANCE,
+						1,
+						new KeyGroupRange(0, 0),
+						env.getTaskKvStateRegistry());
 
 
 		File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -142,13 +152,15 @@ public class RocksDBStateBackendConfigTest {
 		assertNull(rocksDbBackend.getDbStoragePaths());
 
 		Environment env = getMockEnvironment(tempDirs);
-		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(env,
-				env.getJobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				1,
-				new KeyGroupRange(0, 0),
-				env.getTaskKvStateRegistry());
+		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+				createKeyedStateBackend(
+						env,
+						env.getJobID(),
+						"test_op",
+						IntSerializer.INSTANCE,
+						1,
+						new KeyGroupRange(0, 0),
+						env.getTaskKvStateRegistry());
 
 
 		File instanceBasePath = keyedBackend.getInstanceBasePath();

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index a9e8da9..ce513cb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,12 +18,8 @@
 
 package org.apache.flink.api.common.functions;
 
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -33,14 +29,16 @@ import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
 /**
  * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
  * of the function will have a context through which it can access static contextual information (such as 
@@ -347,117 +345,4 @@ public interface RuntimeContext {
 	 */
 	@PublicEvolving
 	<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
-	
-	/**
-	 * Gets the key/value state, which is only accessible if the function is executed on
-	 * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will
-	 * return the value bound to the key of the element currently processed by the function.
-	 * Each operator may maintain multiple key/value states, addressed with different names.
-	 *
-	 * <p>Because the scope of each value is the key of the currently processed element,
-	 * and the elements are distributed by the Flink runtime, the system can transparently
-	 * scale out and redistribute the state and KeyedStream.
-	 *
-	 * <p>The following code example shows how to implement a continuous counter that counts
-	 * how many times elements of a certain key occur, and emits an updated count for that
-	 * element on each occurrence. 
-	 *
-	 * <pre>{@code
-	 * DataStream<MyType> stream = ...;
-	 * KeyedStream<MyType> keyedStream = stream.keyBy("id");     
-	 *
-	 * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
-	 *
-	 *     private State<Long> state;
-	 *
-	 *     public void open(Configuration cfg) {
-	 *         state = getRuntimeContext().getKeyValueState(Long.class, 0L);
-	 *     }
-	 *
-	 *     public Tuple2<MyType, Long> map(MyType value) {
-	 *         long count = state.value();
-	 *         state.update(value + 1);
-	 *         return new Tuple2<>(value, count);
-	 *     }
-	 * });
-	 *
-	 * }</pre>
-	 *
-	 * <p>This method attempts to deduce the type information from the given type class. If the
-	 * full type cannot be determined from the class (for example because of generic parameters),
-	 * the TypeInformation object must be manually passed via 
-	 * {@link #getKeyValueState(String, TypeInformation, Object)}. 
-	 * 
-	 *
-	 * @param name The name of the key/value state.
-	 * @param stateType The class of the type that is stored in the state. Used to generate
-	 *                  serializers for managed memory and checkpointing.
-	 * @param defaultState The default state value, returned when the state is accessed and
-	 *                     no value has yet been set for the key. May be null.
-	 *
-	 * @param <S> The type of the state.
-	 *
-	 * @return The key/value state access.
-	 *
-	 * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
-	 *                                       function (function is not part os a KeyedStream).
-	 * 
-	 * @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead.
-	 */
-	@Deprecated
-	@PublicEvolving
-	<S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
-
-	/**
-	 * Gets the key/value state, which is only accessible if the function is executed on
-	 * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will
-	 * return the value bound to the key of the element currently processed by the function.
-	 * Each operator may maintain multiple key/value states, addressed with different names.
-	 * 
-	 * <p>Because the scope of each value is the key of the currently processed element,
-	 * and the elements are distributed by the Flink runtime, the system can transparently
-	 * scale out and redistribute the state and KeyedStream.
-	 * 
-	 * <p>The following code example shows how to implement a continuous counter that counts
-	 * how many times elements of a certain key occur, and emits an updated count for that
-	 * element on each occurrence. 
-	 * 
-	 * <pre>{@code
-	 * DataStream<MyType> stream = ...;
-	 * KeyedStream<MyType> keyedStream = stream.keyBy("id");     
-	 * 
-	 * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
-	 * 
-	 *     private State<Long> state;
-	 *     
-	 *     public void open(Configuration cfg) {
-	 *         state = getRuntimeContext().getKeyValueState(Long.class, 0L);
-	 *     }
-	 *     
-	 *     public Tuple2<MyType, Long> map(MyType value) {
-	 *         long count = state.value();
-	 *         state.update(value + 1);
-	 *         return new Tuple2<>(value, count);
-	 *     }
-	 * });
-	 *     
-	 * }</pre>
-	 * 
-	 * @param name The name of the key/value state.
-	 * @param stateType The type information for the type that is stored in the state.
-	 *                  Used to create serializers for managed memory and checkpoints.
-	 * @param defaultState The default state value, returned when the state is accessed and
-	 *                     no value has yet been set for the key. May be null.
-	 * @param <S> The type of the state.
-	 *
-	 * @return The key/value state access.
-	 * 
-	 * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
-	 *                                       function (function is not part os a KeyedStream).
-	 * 
-	 * @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead.
-	 */
-	@Deprecated
-	@PublicEvolving
-	<S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6645964..4f559bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.Future;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
@@ -36,15 +31,18 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Future;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -207,20 +205,4 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 		throw new UnsupportedOperationException(
 				"This state is only accessible by functions executed on a KeyedStream");
 	}
-
-	@Override
-	@Deprecated
-	@PublicEvolving
-	public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
-		throw new UnsupportedOperationException(
-				"This state is only accessible by functions executed on a KeyedStream");
-	}
-
-	@Override
-	@Deprecated
-	@PublicEvolving
-	public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
-		throw new UnsupportedOperationException(
-				"This state is only accessible by functions executed on a KeyedStream");
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
deleted file mode 100644
index ac4ed07..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.IOException;
-
-/**
- * This state interface abstracts persistent key/value state in streaming programs.
- * The state is accessed and modified by user functions, and checkpointed consistently
- * by the system as part of the distributed snapshots.
- *
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
- * automatically supplied by the system, so the function always sees the value mapped to the
- * key of the current element. That way, the system can handle stream and state partitioning
- * consistently together.
- *
- * @param <T> Type of the value in the operator state
- * 
- * @deprecated OperatorState has been replaced by {@link ValueState}.
- */
-@Deprecated
-@PublicEvolving
-public interface OperatorState<T> extends State {
-
-	/**
-	 * Returns the current value for the state. When the state is not
-	 * partitioned the returned value is the same for all inputs in a given
-	 * operator instance. If state partitioning is applied, the value returned
-	 * depends on the current operator input, as the operator maintains an
-	 * independent state for each partition.
-	 *
-	 * @return The operator state value corresponding to the current input.
-	 *
-	 * @throws IOException Thrown if the system cannot access the state.
-	 */
-	T value() throws IOException;
-
-	/**
-	 * Updates the operator state accessible by {@link #value()} to the given
-	 * value. The next time {@link #value()} is called (for the same state
-	 * partition) the returned state will represent the updated value. When a
-	 * partitioned state is updated with null, the state for the current key 
-	 * will be removed and the default value is returned on the next access.
-	 *
-	 * @param value
-	 *            The new value for the state.
-	 *
-	 * @throws IOException Thrown if the system cannot access the state.
-	 */
-	void update(T value) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
index 607cb32..de3250a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -37,7 +37,7 @@ import java.io.IOException;
  * @param <T> Type of the value in the state.
  */
 @PublicEvolving
-public interface ValueState<T> extends State, OperatorState<T> {
+public interface ValueState<T> extends State {
 
 	/**
 	 * Returns the current value for the state. When the state is not

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
new file mode 100644
index 0000000..4ae00d1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<T> duplicate() {
+		return this;
+	}
+
+	@Override
+	public T createInstance() {
+		return null;
+	}
+
+	@Override
+	public T copy(T from) {
+
+		try {
+			return InstantiationUtil.clone(from);
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException("Could not copy instance of " + from + '.', e);
+		}
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		return copy(from);
+	}
+
+	@Override
+	public int getLength() {
+		return 0;
+	}
+
+	@Override
+	public void serialize(T record, DataOutputView target) throws IOException {
+		ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target));
+		oos.writeObject(record);
+		oos.flush();
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source));
+
+		try {
+			@SuppressWarnings("unchecked")
+			T nfa = (T) ois.readObject();
+			return nfa;
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException("Could not deserialize NFA.", e);
+		}
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		int size = source.readInt();
+		target.writeInt(size);
+		target.write(source, size);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj instanceof JavaSerializer && ((JavaSerializer<T>) obj).canEqual(this);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof JavaSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return getClass().hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index df40998..080485e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.hdfstests;
 
 import org.apache.commons.io.FileUtils;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FileStatus;
@@ -31,10 +30,8 @@ import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -243,16 +240,21 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 	}
 
 	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
-		byte[] holder = new byte[data.length];
 
-		int pos = 0;
-		int read;
-		while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) {
-			pos += read;
-		}
+		try {
+			byte[] holder = new byte[data.length];
+
+			int pos = 0;
+			int read;
+			while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) {
+				pos += read;
+			}
 
-		assertEquals("not enough data", holder.length, pos);
-		assertEquals("too much data", -1, is.read());
-		assertArrayEquals("wrong data", data, holder);
+			assertEquals("not enough data", holder.length, pos);
+			assertEquals("too much data", -1, is.read());
+			assertArrayEquals("wrong data", data, holder);
+		} finally {
+			is.close();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
index aad408c..2f21346 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -36,7 +37,7 @@ import java.util.PriorityQueue;
  */
 public abstract class AbstractCEPBasePatternOperator<IN, OUT>
 	extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<IN, OUT> {
+	implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
 
 	private static final long serialVersionUID = -4166778210774160757L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 64ffa2a..10bb6ff 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -104,7 +104,6 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 
 	@Override
 	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
-		super.snapshotState(out, checkpointId, timestamp);
 		final ObjectOutputStream oos = new ObjectOutputStream(out);
 
 		oos.writeObject(nfa);
@@ -119,7 +118,6 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 	@Override
 	@SuppressWarnings("unchecked")
 	public void restoreState(FSDataInputStream state) throws Exception {
-		super.restoreState(state);
 		final ObjectInputStream ois = new ObjectInputStream(state);
 
 		nfa = (NFA<IN>)ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 09773a2..07e2662 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -187,7 +187,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 
 	@Override
 	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
-		super.snapshotState(out, checkpointId, timestamp);
 
 		DataOutputView ov = new DataOutputViewStreamWrapper(out);
 		ov.writeInt(keys.size());
@@ -199,7 +198,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 
 	@Override
 	public void restoreState(FSDataInputStream state) throws Exception {
-		super.restoreState(state);
 
 		DataInputView inputView = new DataInputViewStreamWrapper(state);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6a43ddf..4428427 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -34,9 +34,11 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -45,7 +47,9 @@ import scala.concurrent.Future;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -398,9 +402,9 @@ public class CheckpointCoordinator {
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
-		final PendingCheckpoint checkpoint = props.isSavepoint() ?
-			new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) :
-			new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+			final PendingCheckpoint checkpoint = props.isSavepoint() ?
+					new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) :
+					new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
 
 			// schedule the timer that will clean up the expired checkpoints
 			TimerTask canceller = new TimerTask() {
@@ -627,9 +631,8 @@ public class CheckpointCoordinator {
 				isPendingCheckpoint = true;
 
 				if (checkpoint.acknowledgeTask(
-					message.getTaskExecutionId(),
-					message.getStateHandle(),
-					message.getKeyGroupsStateHandle())) {
+						message.getTaskExecutionId(),
+						message.getCheckpointStateHandles())) {
 					if (checkpoint.isFullyAcknowledged()) {
 						completed = checkpoint.finalizeCheckpoint();
 
@@ -640,7 +643,7 @@ public class CheckpointCoordinator {
 
 						if (LOG.isDebugEnabled()) {
 							StringBuilder builder = new StringBuilder();
-							for (Map.Entry<JobVertexID, TaskState> entry: completed.getTaskStates().entrySet()) {
+							for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
 								builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
 							}
 
@@ -654,8 +657,7 @@ public class CheckpointCoordinator {
 
 						triggerQueuedRequests();
 					}
-				}
-				else {
+				} else {
 					// checkpoint did not accept message
 					LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId
 							+ " , task " + message.getTaskExecutionId());
@@ -790,22 +792,80 @@ public class CheckpointCoordinator {
 					}
 
 
+					int oldParallelism = taskState.getParallelism();
+					int newParallelism = executionJobVertex.getParallelism();
+					boolean parallelismChanged = oldParallelism != newParallelism;
 					boolean hasNonPartitionedState = taskState.hasNonPartitionedState();
 
-					if (hasNonPartitionedState && taskState.getParallelism() != executionJobVertex.getParallelism()) {
+					if (hasNonPartitionedState && parallelismChanged) {
 						throw new IllegalStateException("Cannot restore the latest checkpoint because " +
 							"the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
 							"state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
-							" has parallelism " + executionJobVertex.getParallelism() + " whereas the corresponding" +
-							"state object has a parallelism of " + taskState.getParallelism());
+							" has parallelism " + newParallelism + " whereas the corresponding" +
+							"state object has a parallelism of " + oldParallelism);
 					}
 
 					List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
-						executionJobVertex.getMaxParallelism(),
-						executionJobVertex.getParallelism());
+							executionJobVertex.getMaxParallelism(),
+							newParallelism);
+					
+					// operator chain index -> list of the stored partitionables states from all parallel instances
+					@SuppressWarnings("unchecked")
+					List<OperatorStateHandle>[] chainParallelStates =
+							new List[taskState.getChainLength()];
+
+					for (int i = 0; i < oldParallelism; ++i) {
+
+						ChainedStateHandle<OperatorStateHandle> partitionableState =
+								taskState.getPartitionableState(i);
+
+						if (partitionableState != null) {
+							for (int j = 0; j < partitionableState.getLength(); ++j) {
+								OperatorStateHandle opParalleState = partitionableState.get(j);
+								if (opParalleState != null) {
+									List<OperatorStateHandle> opParallelStates =
+											chainParallelStates[j];
+									if (opParallelStates == null) {
+										opParallelStates = new ArrayList<>();
+										chainParallelStates[j] = opParallelStates;
+									}
+									opParallelStates.add(opParalleState);
+								}
+							}
+						}
+					}
+
+					// operator chain index -> lists with collected states (one collection for each parallel subtasks)
+					@SuppressWarnings("unchecked")
+					List<Collection<OperatorStateHandle>>[] redistributedParallelStates =
+							new List[taskState.getChainLength()];
+
+					//TODO here we can employ different redistribution strategies for state, e.g. union state. For now we only offer round robin as the default.
+					OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+
+					for (int i = 0; i < chainParallelStates.length; ++i) {
+						List<OperatorStateHandle> chainOpParallelStates = chainParallelStates[i];
+						if (chainOpParallelStates != null) {
+							//We only redistribute if the parallelism of the operator changed from previous executions
+							if (parallelismChanged) {
+								redistributedParallelStates[i] = repartitioner.repartitionState(
+										chainOpParallelStates,
+										newParallelism);
+							} else {
+								List<Collection<OperatorStateHandle>> repacking = new ArrayList<>(newParallelism);
+								for (OperatorStateHandle operatorStateHandle : chainOpParallelStates) {
+									repacking.add(Collections.singletonList(operatorStateHandle));
+								}
+								redistributedParallelStates[i] = repacking;
+							}
+						}
+					}
 
 					int counter = 0;
-					for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
+
+					for (int i = 0; i < newParallelism; ++i) {
+
+						// non-partitioned state
 						ChainedStateHandle<StreamStateHandle> state = null;
 
 						if (hasNonPartitionedState) {
@@ -813,25 +873,46 @@ public class CheckpointCoordinator {
 
 							if (subtaskState != null) {
 								// count the number of executions for which we set a state
-								counter++;
+								++counter;
 								state = subtaskState.getChainedStateHandle();
 							}
 						}
 
+						// partitionable state
+						@SuppressWarnings("unchecked")
+						Collection<OperatorStateHandle>[] ia = new Collection[taskState.getChainLength()];
+						List<Collection<OperatorStateHandle>> subTaskPartitionableState = Arrays.asList(ia);
+
+						for (int j = 0; j < redistributedParallelStates.length; ++j) {
+							List<Collection<OperatorStateHandle>> redistributedParallelState =
+									redistributedParallelStates[j];
+
+							if (redistributedParallelState != null) {
+								subTaskPartitionableState.set(j, redistributedParallelState.get(i));
+							}
+						}
+
+						// key-partitioned state
 						KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(i);
 
-						List<KeyGroupsStateHandle> subtaskKeyGroupStates = getKeyGroupsStateHandles(
-								taskState.getKeyGroupStates(),
-								subtaskKeyGroupIds);
+						// Again, we only repartition if the parallelism changed
+						List<KeyGroupsStateHandle> subtaskKeyGroupStates = parallelismChanged ?
+								getKeyGroupsStateHandles(taskState.getKeyGroupStates(), subtaskKeyGroupIds)
+								: Collections.singletonList(taskState.getKeyGroupState(i));
 
 						Execution currentExecutionAttempt = executionJobVertex
 							.getTaskVertices()[i]
 							.getCurrentExecutionAttempt();
 
-						currentExecutionAttempt.setInitialState(state, subtaskKeyGroupStates);
+						CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(
+								state,
+								null/*subTaskPartionableState*/, //TODO chose right structure and put redistributed states here
+								subtaskKeyGroupStates);
+
+						currentExecutionAttempt.setInitialState(checkpointStateHandles, subTaskPartitionableState);
 					}
 
-					if (allOrNothingState && counter > 0 && counter < executionJobVertex.getParallelism()) {
+					if (allOrNothingState && counter > 0 && counter < newParallelism) {
 						throw new IllegalStateException("The checkpoint contained state only for " +
 							"a subset of tasks for vertex " + executionJobVertex);
 					}
@@ -859,7 +940,7 @@ public class CheckpointCoordinator {
 
 		for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) {
 			KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds);
-			if(intersection.getNumberOfKeyGroups() > 0) {
+			if (intersection.getNumberOfKeyGroups() > 0) {
 				subtaskKeyGroupStates.add(intersection);
 			}
 		}
@@ -881,7 +962,7 @@ public class CheckpointCoordinator {
 	public static List<KeyGroupRange> createKeyGroupPartitions(int numberKeyGroups, int parallelism) {
 		Preconditions.checkArgument(numberKeyGroups >= parallelism);
 		List<KeyGroupRange> result = new ArrayList<>(parallelism);
-		int start = 0;
+
 		for (int i = 0; i < parallelism; ++i) {
 			result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 7cb3916..0d279f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -153,9 +153,4 @@ public class CompletedCheckpoint implements StateObject {
 	public String toString() {
 		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
 	}
-
-	@Override
-	public void close() throws IOException {
-		StateUtil.bestEffortCloseAllStateObjects(taskStates.values());
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
new file mode 100644
index 0000000..98810f1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Interface that allows to implement different strategies for repartitioning of operator state as parallelism changes.
+ */
+public interface OperatorStateRepartitioner {
+
+	/**
+	 * @param previousParallelSubtaskStates List of state handles to the parallel subtask states of an operator, as they
+	 *                                      have been checkpointed.
+	 * @param parallelism                   The parallelism that we consider for the state redistribution. Determines the size of the
+	 *                                      returned list.
+	 * @return List with one entry per parallel subtask. Each subtask receives now one collection of states that build
+	 * of the new total state for this subtask.
+	 */
+	List<Collection<OperatorStateHandle>> repartitionState(
+			List<OperatorStateHandle> previousParallelSubtaskStates,
+			int parallelism);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index d499a5a..2ca9d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -23,7 +23,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
@@ -167,49 +169,80 @@ public class PendingCheckpoint {
 	}
 	
 	public boolean acknowledgeTask(
-		ExecutionAttemptID attemptID,
-		ChainedStateHandle<StreamStateHandle> state,
-		List<KeyGroupsStateHandle> keyGroupsState) {
+			ExecutionAttemptID attemptID,
+			CheckpointStateHandles checkpointStateHandles) {
 
 		synchronized (lock) {
 			if (discarded) {
 				return false;
 			}
-			
-			ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
-			if (vertex != null) {
-				if (state != null || keyGroupsState != null) {
 
-					JobVertexID jobVertexID = vertex.getJobvertexId();
+			ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
 
-					TaskState taskState;
+			if (vertex != null) {
 
-					if (taskStates.containsKey(jobVertexID)) {
-						taskState = taskStates.get(jobVertexID);
-					} else {
-						taskState = new TaskState(jobVertexID, vertex.getTotalNumberOfParallelSubtasks(), vertex.getMaxParallelism());
-						taskStates.put(jobVertexID, taskState);
+				if (checkpointStateHandles != null) {
+					List<KeyGroupsStateHandle> keyGroupsState = checkpointStateHandles.getKeyGroupsStateHandle();
+					ChainedStateHandle<StreamStateHandle> nonPartitionedState =
+							checkpointStateHandles.getNonPartitionedStateHandles();
+					ChainedStateHandle<OperatorStateHandle> partitioneableState =
+							checkpointStateHandles.getPartitioneableStateHandles();
+
+					if (nonPartitionedState != null || partitioneableState != null || keyGroupsState != null) {
+
+						JobVertexID jobVertexID = vertex.getJobvertexId();
+
+						int subtaskIndex = vertex.getParallelSubtaskIndex();
+
+						TaskState taskState;
+
+						if (taskStates.containsKey(jobVertexID)) {
+							taskState = taskStates.get(jobVertexID);
+						} else {
+							//TODO this should go away when we remove chained state, assigning state to operators directly instead
+							int chainLength;
+							if (nonPartitionedState != null) {
+								chainLength = nonPartitionedState.getLength();
+							} else if (partitioneableState != null) {
+								chainLength = partitioneableState.getLength();
+							} else {
+								chainLength = 1;
+							}
+
+							taskState = new TaskState(
+								jobVertexID,
+								vertex.getTotalNumberOfParallelSubtasks(),
+								vertex.getMaxParallelism(),
+								chainLength);
+
+							taskStates.put(jobVertexID, taskState);
+						}
+
+						long duration = System.currentTimeMillis() - checkpointTimestamp;
+
+						if (nonPartitionedState != null) {
+							taskState.putState(
+									subtaskIndex,
+									new SubtaskState(nonPartitionedState, duration));
+						}
+
+						if(partitioneableState != null && !partitioneableState.isEmpty()) {
+							taskState.putPartitionableState(subtaskIndex, partitioneableState);
+						}
+
+						// currently a checkpoint can only contain keyed state
+						// for the head operator
+						if (keyGroupsState != null && !keyGroupsState.isEmpty()) {
+							KeyGroupsStateHandle keyGroupsStateHandle = keyGroupsState.get(0);
+							taskState.putKeyedState(subtaskIndex, keyGroupsStateHandle);
+						}
 					}
+				}
 
-					long duration = System.currentTimeMillis() - checkpointTimestamp;
+				++numAcknowledgedTasks;
 
-					if (state != null) {
-						taskState.putState(
-							vertex.getParallelSubtaskIndex(),
-							new SubtaskState(state, duration));
-					}
-
-					// currently a checkpoint can only contain keyed state
-					// for the head operator
-					if (keyGroupsState != null && !keyGroupsState.isEmpty()) {
-						KeyGroupsStateHandle keyGroupsStateHandle = keyGroupsState.get(0);
-						taskState.putKeyedState(vertex.getParallelSubtaskIndex(), keyGroupsStateHandle);
-					}
-				}
-				numAcknowledgedTasks++;
 				return true;
-			}
-			else {
+			} else {
 				return false;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
new file mode 100644
index 0000000..09a35f6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Current default implementation of {@link OperatorStateRepartitioner} that redistributes state in round robin fashion.
+ */
+public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepartitioner {
+
+	public static final OperatorStateRepartitioner INSTANCE = new RoundRobinOperatorStateRepartitioner();
+	private static final boolean OPTIMIZE_MEMORY_USE = false;
+
+	@Override
+	public List<Collection<OperatorStateHandle>> repartitionState(
+			List<OperatorStateHandle> previousParallelSubtaskStates,
+			int parallelism) {
+
+		Preconditions.checkNotNull(previousParallelSubtaskStates);
+		Preconditions.checkArgument(parallelism > 0);
+
+		// Reorganize: group by (State Name -> StreamStateHandle + Offsets)
+		Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState =
+				groupByStateName(previousParallelSubtaskStates);
+
+		if (OPTIMIZE_MEMORY_USE) {
+			previousParallelSubtaskStates.clear(); // free for GC at to cost that old handles are no longer available
+		}
+
+		// Assemble result from all merge maps
+		List<Collection<OperatorStateHandle>> result = new ArrayList<>(parallelism);
+
+		// Do the actual repartitioning for all named states
+		List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
+				repartition(nameToState, parallelism);
+
+		for (int i = 0; i < mergeMapList.size(); ++i) {
+			result.add(i, new ArrayList<>(mergeMapList.get(i).values()));
+		}
+
+		return result;
+	}
+
+	/**
+	 * Group by the different named states.
+	 */
+	private Map<String, List<Tuple2<StreamStateHandle, long[]>>> groupByStateName(
+			List<OperatorStateHandle> previousParallelSubtaskStates) {
+
+		//Reorganize: group by (State Name -> StreamStateHandle + Offsets)
+		Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState = new HashMap<>();
+		for (OperatorStateHandle psh : previousParallelSubtaskStates) {
+
+			for (Map.Entry<String, long[]> e : psh.getStateNameToPartitionOffsets().entrySet()) {
+
+				List<Tuple2<StreamStateHandle, long[]>> stateLocations = nameToState.get(e.getKey());
+
+				if (stateLocations == null) {
+					stateLocations = new ArrayList<>();
+					nameToState.put(e.getKey(), stateLocations);
+				}
+
+				stateLocations.add(new Tuple2<>(psh.getDelegateStateHandle(), e.getValue()));
+			}
+		}
+		return nameToState;
+	}
+
+	/**
+	 * Repartition all named states.
+	 */
+	private List<Map<StreamStateHandle, OperatorStateHandle>> repartition(
+			Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState, int parallelism) {
+
+		// We will use this to merge w.r.t. StreamStateHandles for each parallel subtask inside the maps
+		List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = new ArrayList<>(parallelism);
+		// Initialize
+		for (int i = 0; i < parallelism; ++i) {
+			mergeMapList.add(new HashMap<StreamStateHandle, OperatorStateHandle>());
+		}
+
+		int startParallelOP = 0;
+		// Iterate all named states and repartition one named state at a time per iteration
+		for (Map.Entry<String, List<Tuple2<StreamStateHandle, long[]>>> e : nameToState.entrySet()) {
+
+			List<Tuple2<StreamStateHandle, long[]>> current = e.getValue();
+
+			// Determine actual number of partitions for this named state
+			int totalPartitions = 0;
+			for (Tuple2<StreamStateHandle, long[]> offsets : current) {
+				totalPartitions += offsets.f1.length;
+			}
+
+			// Repartition the state across the parallel operator instances
+			int lstIdx = 0;
+			int offsetIdx = 0;
+			int baseFraction = totalPartitions / parallelism;
+			int remainder = totalPartitions % parallelism;
+
+			int newStartParallelOp = startParallelOP;
+
+			for (int i = 0; i < parallelism; ++i) {
+
+				// Preparation: calculate the actual index considering wrap around
+				int parallelOpIdx = (i + startParallelOP) % parallelism;
+
+				// Now calculate the number of partitions we will assign to the parallel instance in this round ...
+				int numberOfPartitionsToAssign = baseFraction;
+
+				// ... and distribute odd partitions while we still have some, one at a time
+				if (remainder > 0) {
+					++numberOfPartitionsToAssign;
+					--remainder;
+				} else if (remainder == 0) {
+					// We are out of odd partitions now and begin our next redistribution round with the current
+					// parallel operator to ensure fair load balance
+					newStartParallelOp = parallelOpIdx;
+					--remainder;
+				}
+
+				// Now start collection the partitions for the parallel instance into this list
+				List<Tuple2<StreamStateHandle, long[]>> parallelOperatorState = new ArrayList<>();
+
+				while (numberOfPartitionsToAssign > 0) {
+					Tuple2<StreamStateHandle, long[]> handleWithOffsets = current.get(lstIdx);
+					long[] offsets = handleWithOffsets.f1;
+					int remaining = offsets.length - offsetIdx;
+					// Repartition offsets
+					long[] offs;
+					if (remaining > numberOfPartitionsToAssign) {
+						offs = Arrays.copyOfRange(offsets, offsetIdx, offsetIdx + numberOfPartitionsToAssign);
+						offsetIdx += numberOfPartitionsToAssign;
+					} else {
+						if (OPTIMIZE_MEMORY_USE) {
+							handleWithOffsets.f1 = null; // GC
+						}
+						offs = Arrays.copyOfRange(offsets, offsetIdx, offsets.length);
+						offsetIdx = 0;
+						++lstIdx;
+					}
+
+					parallelOperatorState.add(
+							new Tuple2<>(handleWithOffsets.f0, offs));
+
+					numberOfPartitionsToAssign -= remaining;
+
+					// As a last step we merge partitions that use the same StreamStateHandle in a single
+					// OperatorStateHandle
+					Map<StreamStateHandle, OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx);
+					OperatorStateHandle psh = mergeMap.get(handleWithOffsets.f0);
+					if (psh == null) {
+						psh = new OperatorStateHandle(handleWithOffsets.f0, new HashMap<String, long[]>());
+						mergeMap.put(handleWithOffsets.f0, psh);
+					}
+					psh.getStateNameToPartitionOffsets().put(e.getKey(), offs);
+				}
+			}
+			startParallelOP = newStartParallelOp;
+			e.setValue(null);
+		}
+		return mergeMapList;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index 9beb233..2aa0491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -24,8 +24,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -112,11 +110,4 @@ public class SubtaskState implements StateObject {
 	public String toString() {
 		return String.format("SubtaskState(Size: %d, Duration: %d, State: %s)", stateSize, duration, chainedStateHandle);
 	}
-
-	@Override
-	public void close() throws IOException {
-		chainedStateHandle.close();
-	}
-
-
 }