You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/16 17:34:29 UTC

[4/4] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend

[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab014ef9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab014ef9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab014ef9

Branch: refs/heads/master
Commit: ab014ef94e0e9137ac6f8f41dae385ff71e8ba5b
Parents: 30bb958
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Mar 3 10:51:15 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Mar 16 18:34:02 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |    5 +
 .../state/RocksDBStateBackendTest.java          |   86 +-
 .../java/org/apache/flink/util/MathUtils.java   |   35 +-
 .../filesystem/AbstractFsStateSnapshot.java     |   30 +-
 .../state/memory/AbstractMemStateSnapshot.java  |   40 +-
 .../AbstractMigrationRestoreStrategy.java       |  117 ++
 .../state/memory/MigrationRestoreSnapshot.java  |   32 +
 .../state/AbstractKeyedStateBackend.java        |    7 +
 .../runtime/state/ArrayListSerializer.java      |   14 +-
 .../state/KeyedBackendSerializationProxy.java   |   23 +-
 .../flink/runtime/state/KeyedStateBackend.java  |   28 +-
 .../state/StateTransformationFunction.java      |   23 +
 .../state/filesystem/FsStateBackend.java        |  109 +-
 .../state/heap/AbstractHeapMergingState.java    |   93 +-
 .../runtime/state/heap/AbstractHeapState.java   |   84 +-
 .../state/heap/AbstractStateTableSnapshot.java  |   51 +
 .../state/heap/CopyOnWriteStateTable.java       | 1066 ++++++++++++++++++
 .../heap/CopyOnWriteStateTableSnapshot.java     |  188 +++
 .../state/heap/HeapAggregatingState.java        |   92 +-
 .../runtime/state/heap/HeapFoldingState.java    |   70 +-
 .../state/heap/HeapKeyedStateBackend.java       |  426 +++----
 .../flink/runtime/state/heap/HeapListState.java |   63 +-
 .../flink/runtime/state/heap/HeapMapState.java  |  167 +--
 .../runtime/state/heap/HeapReducingState.java   |   82 +-
 .../runtime/state/heap/HeapValueState.java      |   47 +-
 .../runtime/state/heap/InternalKeyContext.java  |   60 +
 .../state/heap/NestedMapsStateTable.java        |  363 ++++++
 .../flink/runtime/state/heap/StateEntry.java    |   44 +
 .../flink/runtime/state/heap/StateTable.java    |  222 ++--
 .../state/heap/StateTableByKeyGroupReader.java  |   38 +
 .../state/heap/StateTableByKeyGroupReaders.java |  136 +++
 .../runtime/state/heap/StateTableSnapshot.java  |   45 +
 .../state/memory/MemoryStateBackend.java        |   26 +
 .../runtime/query/QueryableStateClientTest.java |    7 +-
 .../message/KvStateRequestSerializerTest.java   |   38 +-
 .../state/AsyncFileStateBackendTest.java        |   27 +
 .../state/AsyncMemoryStateBackendTest.java      |   27 +
 .../runtime/state/FileStateBackendTest.java     |    8 +-
 .../runtime/state/MemoryStateBackendTest.java   |    8 +-
 .../runtime/state/StateBackendTestBase.java     |  216 +++-
 .../state/heap/CopyOnWriteStateTableTest.java   |  486 ++++++++
 .../state/heap/HeapAggregatingStateTest.java    |   21 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |  173 +++
 .../runtime/state/heap/HeapListStateTest.java   |   17 +-
 .../state/heap/HeapReducingStateTest.java       |   23 +-
 .../state/heap/HeapStateBackendTestBase.java    |   54 +
 .../StateTableSnapshotCompatibilityTest.java    |  118 ++
 .../util/BlockerCheckpointStreamFactory.java    |  118 ++
 .../heap_keyed_statebackend_1_2.snapshot        |  Bin 0 -> 2068 bytes
 .../api/windowing/windows/TimeWindow.java       |    5 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   64 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |   20 +
 .../test/state/ManualWindowSpeedITCase.java     |    7 +-
 55 files changed, 4239 insertions(+), 1162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index aaccc2f..f585d21 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1219,4 +1219,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// expected
 		}
 	}
+
+	@Override
+	public boolean supportsAsynchronousSnapshots() {
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index c7b5c20..708613b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -31,14 +31,13 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -364,89 +363,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		assertEquals(null, keyedStateBackend.db);
 	}
 
-	static class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
-
-		private final int maxSize;
-		private int afterNumberInvocations;
-		private OneShotLatch blocker;
-		private OneShotLatch waiter;
-
-		MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
-
-		public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
-			return lastCreatedStream;
-		}
-
-		public BlockerCheckpointStreamFactory(int maxSize) {
-			this.maxSize = maxSize;
-		}
-
-		public void setAfterNumberInvocations(int afterNumberInvocations) {
-			this.afterNumberInvocations = afterNumberInvocations;
-		}
-
-		public void setBlockerLatch(OneShotLatch latch) {
-			this.blocker = latch;
-		}
-
-		public void setWaiterLatch(OneShotLatch latch) {
-			this.waiter = latch;
-		}
-
-		@Override
-		public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
-			waiter.trigger();
-			this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
-				private int afterNInvocations = afterNumberInvocations;
-				private final OneShotLatch streamBlocker = blocker;
-				private final OneShotLatch streamWaiter = waiter;
-
-				@Override
-				public void write(int b) throws IOException {
-
-					if (afterNInvocations > 0) {
-						--afterNInvocations;
-					}
-
-					if (0 == afterNInvocations && null != streamBlocker) {
-						try {
-							streamBlocker.await();
-						} catch (InterruptedException ignored) {
-						}
-					}
-					try {
-						super.write(b);
-					} catch (IOException ex) {
-						if (null != streamWaiter) {
-							streamWaiter.trigger();
-						}
-						throw ex;
-					}
-
-					if (0 == afterNInvocations && null != streamWaiter) {
-						streamWaiter.trigger();
-					}
-				}
-
-				@Override
-				public void close() {
-					super.close();
-					if (null != streamWaiter) {
-						streamWaiter.trigger();
-					}
-				}
-			};
-
-			return lastCreatedStream;
-		}
-
-		@Override
-		public void close() throws Exception {
-
-		}
-	}
-
 	private static class AcceptAllFilter implements IOFileFilter {
 		@Override
 		public boolean accept(File file) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 074e8ae..1d84a39 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -140,11 +140,7 @@ public final class MathUtils {
 		code = code * 5 + 0xe6546b64;
 
 		code ^= 4;
-		code ^= code >>> 16;
-		code *= 0x85ebca6b;
-		code ^= code >>> 13;
-		code *= 0xc2b2ae35;
-		code ^= code >>> 16;
+		code = bitMix(code);
 
 		if (code >= 0) {
 			return code;
@@ -172,6 +168,35 @@ public final class MathUtils {
 		return x + 1;
 	}
 
+	/**
+	 * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution.
+	 *
+	 * @param in the long (64-bit)input.
+	 * @return the bit-mixed int (32-bit) output
+	 */
+	public static int longToIntWithBitMixing(long in) {
+		in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
+		in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
+		in = in ^ (in >>> 31);
+		return (int) in;
+	}
+
+	/**
+	 * Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is
+	 * from Murmur's 32 bit finalizer.
+	 *
+	 * @param in the input value
+	 * @return the bit-mixed output value
+	 */
+	public static int bitMix(int in) {
+		in ^= in >>> 16;
+		in *= 0x85ebca6b;
+		in ^= in >>> 13;
+		in *= 0xc2b2ae35;
+		in ^= in >>> 16;
+		return in;
+	}
+
 	// ============================================================================================
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 103c214..a15e49d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -21,8 +21,16 @@ package org.apache.flink.migration.runtime.state.filesystem;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.AbstractMigrationRestoreStrategy;
+import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
 
 import java.io.IOException;
 
@@ -36,7 +44,7 @@ import java.io.IOException;
 @Deprecated
 @SuppressWarnings("deprecation")
 public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
-		extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD> {
+		extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -85,4 +93,24 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte
 	public SD getStateDesc() {
 		return stateDesc;
 	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public StateTable<K, N, SV> deserialize(
+			String stateName,
+			HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+		final FileSystem fs = getFilePath().getFileSystem();
+		try (FSDataInputStream inStream = fs.open(getFilePath())) {
+			final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
+			AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+					new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+						@Override
+						protected DataInputView openDataInputView() throws IOException {
+							return inView;
+						}
+					};
+			return restoreStrategy.deserialize(stateName, stateBackend);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
index 6056578..ff86f7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -21,17 +21,18 @@ package org.apache.flink.migration.runtime.state.memory;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 @Deprecated
 @SuppressWarnings("deprecation")
 public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
-		implements KvStateSnapshot<K, N, S, SD> {
+		implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -73,24 +74,21 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD ext
 		this.data = data;
 	}
 
-	public HashMap<N, Map<K, SV>> deserialize() throws IOException {
-		DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
-
-		final int numKeys = inView.readInt();
-		HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
-
-		for (int i = 0; i < numKeys && !closed; i++) {
-			N namespace = namespaceSerializer.deserialize(inView);
-			final int numValues = inView.readInt();
-			Map<K, SV> namespaceMap = new HashMap<>(numValues);
-			stateMap.put(namespace, namespaceMap);
-			for (int j = 0; j < numValues; j++) {
-				K key = keySerializer.deserialize(inView);
-				SV value = stateSerializer.deserialize(inView);
-				namespaceMap.put(key, value);
-			}
-		}
-		return stateMap;
+	@Override
+	@SuppressWarnings("unchecked")
+	public StateTable<K, N, SV> deserialize(
+			String stateName,
+			HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+		final DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
+		AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+				new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+					@Override
+					protected DataInputView openDataInputView() throws IOException {
+						return inView;
+					}
+				};
+		return restoreStrategy.deserialize(stateName, stateBackend);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
new file mode 100644
index 0000000..e572619
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class outlines the general strategy to restore from migration states.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+@Deprecated
+public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements MigrationRestoreSnapshot<K, N, S> {
+
+	/**
+	 * Key Serializer
+	 */
+	protected final TypeSerializer<K> keySerializer;
+
+	/**
+	 * Namespace Serializer
+	 */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/**
+	 * Serializer for the state value
+	 */
+	protected final TypeSerializer<S> stateSerializer;
+
+	public AbstractMigrationRestoreStrategy(
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<S> stateSerializer) {
+
+		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+		this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+	}
+
+	@Override
+	public StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+		Preconditions.checkNotNull(stateName, "State name is null. Cannot deserialize snapshot.");
+		Preconditions.checkNotNull(stateBackend, "State backend is null. Cannot deserialize snapshot.");
+
+		final KeyGroupRange keyGroupRange = stateBackend.getKeyGroupRange();
+		Preconditions.checkState(1 == keyGroupRange.getNumberOfKeyGroups(),
+				"Unexpected number of key-groups for restoring from Flink 1.1");
+
+		TypeSerializer<N> patchedNamespaceSerializer = this.namespaceSerializer;
+
+		if (patchedNamespaceSerializer instanceof VoidSerializer) {
+			patchedNamespaceSerializer = (TypeSerializer<N>) VoidNamespaceSerializer.INSTANCE;
+		}
+
+		RegisteredBackendStateMetaInfo<N, S> registeredBackendStateMetaInfo =
+				new RegisteredBackendStateMetaInfo<>(
+						StateDescriptor.Type.UNKNOWN,
+						stateName,
+						patchedNamespaceSerializer,
+						stateSerializer);
+
+		final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredBackendStateMetaInfo);
+		final DataInputView inView = openDataInputView();
+		final int keyGroup = keyGroupRange.getStartKeyGroup();
+		final int numNamespaces = inView.readInt();
+
+		for (int i = 0; i < numNamespaces; i++) {
+			N namespace = namespaceSerializer.deserialize(inView);
+			if (null == namespace) {
+				namespace = (N) VoidNamespace.INSTANCE;
+			}
+			final int numKV = inView.readInt();
+			for (int j = 0; j < numKV; j++) {
+				K key = keySerializer.deserialize(inView);
+				S value = stateSerializer.deserialize(inView);
+				stateTable.put(key, keyGroup, namespace, value);
+			}
+		}
+		return stateTable;
+	}
+
+	/**
+	 * Different state handles require different code to end up with a {@link DataInputView}.
+	 */
+	protected abstract DataInputView openDataInputView() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
new file mode 100644
index 0000000..ea529db
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Migration;
+
+import java.io.IOException;
+
+@Deprecated
+@Internal
+public interface MigrationRestoreSnapshot<K, N, S> extends Migration {
+	StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index aba00f3..1f2f4a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -254,6 +255,7 @@ public abstract class AbstractKeyedStateBackend<K>
 	/**
 	 * @see KeyedStateBackend
 	 */
+	@Override
 	public KeyGroupRange getKeyGroupRange() {
 		return keyGroupRange;
 	}
@@ -382,4 +384,9 @@ public abstract class AbstractKeyedStateBackend<K>
 	public void close() throws IOException {
 		cancelStreamRegistry.close();
 	}
+
+	@VisibleForTesting
+	public boolean supportsAsynchronousSnapshots() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index f5a6405..0badb41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -57,11 +57,17 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 
 	@Override
 	public ArrayList<T> copy(ArrayList<T> from) {
-		ArrayList<T> newList = new ArrayList<>(from.size());
-		for (int i = 0; i < from.size(); i++) {
-			newList.add(elementSerializer.copy(from.get(i)));
+		if (elementSerializer.isImmutableType()) {
+			// fast track using memcopy for immutable types
+			return new ArrayList<>(from);
+		} else {
+			// element-wise deep copy for mutable types
+			ArrayList<T> newList = new ArrayList<>(from.size());
+			for (int i = 0; i < from.size(); i++) {
+				newList.add(elementSerializer.copy(from.get(i)));
+			}
+			return newList;
 		}
-		return newList;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index dbee6cb..5661c38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -37,11 +38,12 @@ import java.util.List;
  */
 public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
 
-	private static final int VERSION = 1;
+	public static final int VERSION = 2;
 
 	private TypeSerializerSerializationProxy<?> keySerializerProxy;
 	private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
 
+	private int restoredVersion;
 	private ClassLoader userCodeClassLoader;
 
 	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
@@ -51,6 +53,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
 		this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
 		this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+		this.restoredVersion = VERSION;
 		Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
 	}
 
@@ -67,6 +70,22 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		return VERSION;
 	}
 
+	public int getRestoredVersion() {
+		return restoredVersion;
+	}
+
+	@Override
+	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+		super.resolveVersionRead(foundVersion);
+		this.restoredVersion = foundVersion;
+	}
+
+	@Override
+	public boolean isCompatibleVersion(int version) {
+		// we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
+		return super.isCompatibleVersion(version) || version == 1;
+	}
+
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
@@ -96,7 +115,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		}
 	}
 
-//----------------------------------------------------------------------------------------------------------------------
+	//----------------------------------------------------------------------------------------------------------------------
 
 	/**
 	 * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 15e0491..09e27e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -21,13 +21,14 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
 /**
  * A keyed state backend provides methods for managing keyed state.
  *
  * @param <K> The key by which state is keyed.
  */
-public interface KeyedStateBackend<K> {
+public interface KeyedStateBackend<K> extends InternalKeyContext<K> {
 
 	/**
 	 * Sets the current key that is used for partitioned state.
@@ -36,31 +37,6 @@ public interface KeyedStateBackend<K> {
 	void setCurrentKey(K newKey);
 
 	/**
-	 * Used by states to access the current key.
-	 */
-	K getCurrentKey();
-
-	/**
-	 * Returns the key-group to which the current key belongs.
-	 */
-	int getCurrentKeyGroupIndex();
-
-	/**
-	 * Returns the number of key-groups aka max parallelism.
-	 */
-	int getNumberOfKeyGroups();
-
-	/**
-	 * Returns the key groups for this backend.
-	 */
-	KeyGroupsList getKeyGroupRange();
-
-	/**
-	 * {@link TypeSerializer} for the state backend key type.
-	 */
-	TypeSerializer<K> getKeySerializer();
-
-	/**
 	 * Creates or retrieves a keyed state backed by this state backend.
 	 *
 	 * @param namespaceSerializer The serializer used for the namespace type of the state

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
new file mode 100644
index 0000000..9e12ee5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public interface StateTransformationFunction<S, T> {
+	S apply(S previousState, T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 2e9198f..e27712c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -66,7 +66,10 @@ public class FsStateBackend extends AbstractStateBackend {
 
 	/** State below this size will be stored as part of the metadata, rather than in files */
 	private final int fileStateThreshold;
-	
+
+	/** Switch to chose between synchronous and asynchronous snapshots */
+	private final boolean asynchronousSnapshots;
+
 	/**
 	 * Creates a new state backend that stores its checkpoint data in the file system and location
 	 * defined by the given URI.
@@ -99,6 +102,27 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 *
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+		this(new Path(checkpointDataUri), asynchronousSnapshots);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public FsStateBackend(Path checkpointDataUri) throws IOException {
@@ -118,10 +142,52 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 *
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+		this(checkpointDataUri.toUri(), asynchronousSnapshots);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public FsStateBackend(URI checkpointDataUri) throws IOException {
-		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, false);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 *
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, asynchronousSnapshots);
 	}
 
 	/**
@@ -139,17 +205,47 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *                          and the path to the checkpoint data directory.
 	 * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
 	 *                             rather than in files
-	 * 
+	 *
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds.
 	 */
 	public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
+
+		this(checkpointDataUri, fileStateSizeThreshold, false);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
+	 *                             rather than in files
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 *
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(
+			URI checkpointDataUri,
+			int fileStateSizeThreshold,
+			boolean asynchronousSnapshots) throws IOException {
+
 		checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
-		checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, 
+		checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
 				"The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD);
 
 		this.fileStateThreshold = fileStateSizeThreshold;
 		this.basePath = validateAndNormalizeUri(checkpointDataUri);
+
+		this.asynchronousSnapshots = asynchronousSnapshots;
 	}
 
 	/**
@@ -166,9 +262,9 @@ public class FsStateBackend extends AbstractStateBackend {
 	 * Gets the threshold below which state is stored as part of the metadata, rather than in files.
 	 * This threshold ensures that the backend does not create a large amount of very small files,
 	 * where potentially the file pointers are larger than the state itself.
-	 * 
+	 *
 	 * <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}.
-	 * 
+	 *
 	 * @return The file size threshold, in bytes.
 	 */
 	public int getMinFileSizeThreshold() {
@@ -209,6 +305,7 @@ public class FsStateBackend extends AbstractStateBackend {
 				env.getUserClassLoader(),
 				numberOfKeyGroups,
 				keyGroupRange,
+				asynchronousSnapshots,
 				env.getExecutionConfig());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index 4ac7125..3e76423 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -22,18 +22,15 @@ import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
 
 import java.util.Collection;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
  * that is stored on the heap.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <SV> The type of the values in the state.
@@ -45,21 +42,25 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat
 		implements InternalMergingState<N, IN, OUT> {
 
 	/**
+	 * The merge transformation function that implements the merge logic.
+	 */
+	private final MergeTransformation mergeTransformation;
+
+	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
 	 */
 	protected AbstractHeapMergingState(
-			KeyedStateBackend<K> backend,
 			SD stateDesc,
 			StateTable<K, N, SV> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 
-		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.mergeTransformation = new MergeTransformation();
 	}
 
 	@Override
@@ -68,56 +69,40 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat
 			return; // nothing to do
 		}
 
-		final K key = backend.getCurrentKey();
-		checkState(key != null, "No key set.");
-
-		final Map<N, Map<K, SV>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap != null) {
-			SV merged = null;
-
-			// merge the sources
-			for (N source : sources) {
-				Map<K, SV> keysForNamespace = namespaceMap.get(source);
-				if (keysForNamespace != null) {
-					// get and remove the next source per namespace/key
-					SV sourceState = keysForNamespace.remove(key);
-
-					// if the namespace map became empty, remove 
-					if (keysForNamespace.isEmpty()) {
-						namespaceMap.remove(source);
-					}
-
-					if (merged != null && sourceState != null) {
-						merged = mergeState(merged, sourceState);
-					}
-					else if (merged == null) {
-						merged = sourceState;
-					}
-				}
-			}
+		final StateTable<K, N, SV> map = stateTable;
+
+		SV merged = null;
+
+		// merge the sources
+		for (N source : sources) {
 
-			// merge into the target, if needed
-			if (merged != null) {
-				Map<K, SV> keysForTarget = namespaceMap.get(target);
-				if (keysForTarget == null) {
-					keysForTarget = createNewMap();
-					namespaceMap.put(target, keysForTarget);
-				}
-				SV targetState = keysForTarget.get(key);
-
-				if (targetState != null) {
-					targetState = mergeState(targetState, merged);
-				}
-				else {
-					targetState = merged;
-				}
-				keysForTarget.put(key, targetState);
+			// get and remove the next source per namespace/key
+			SV sourceState = map.removeAndGetOld(source);
+
+			if (merged != null && sourceState != null) {
+				merged = mergeState(merged, sourceState);
+			} else if (merged == null) {
+				merged = sourceState;
 			}
 		}
 
-		// else no entries for that key at all, nothing to do skip
+		// merge into the target, if needed
+		if (merged != null) {
+			map.transform(target, merged, mergeTransformation);
+		}
 	}
 
 	protected abstract SV mergeState(SV a, SV b) throws Exception;
-}
+
+	final class MergeTransformation implements StateTransformationFunction<SV, SV> {
+
+		@Override
+		public SV apply(SV targetState, SV merged) throws Exception {
+			if (targetState != null) {
+				return mergeState(targetState, merged);
+			} else {
+				return merged;
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 18b71de..7e1123d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -25,18 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.util.Preconditions;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Base class for partitioned {@link ListState} implementations that are backed by a regular
  * heap hash map. The concrete implementations define how the state is checkpointed.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <SV> The type of the values in the state.
@@ -53,9 +48,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 	protected final SD stateDesc;
 
 	/** The current namespace, which the access methods will refer to. */
-	protected N currentNamespace = null;
-
-	protected final KeyedStateBackend<K> backend;
+	protected N currentNamespace;
 
 	protected final TypeSerializer<K> keySerializer;
 
@@ -64,58 +57,28 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
 	 */
 	protected AbstractHeapState(
-			KeyedStateBackend<K> backend,
 			SD stateDesc,
 			StateTable<K, N, SV> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 
-		Preconditions.checkNotNull(stateTable, "State table must not be null.");
-
-		this.backend = backend;
 		this.stateDesc = stateDesc;
-		this.stateTable = stateTable;
+		this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null.");
 		this.keySerializer = keySerializer;
 		this.namespaceSerializer = namespaceSerializer;
+		this.currentNamespace = null;
 	}
 
 	// ------------------------------------------------------------------------
 
 	@Override
 	public final void clear() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, SV>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			return;
-		}
-
-		Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return;
-		}
-
-		SV removed = keyedMap.remove(backend.getCurrentKey());
-
-		if (removed == null) {
-			return;
-		}
-
-		if (!keyedMap.isEmpty()) {
-			return;
-		}
-
-		namespaceMap.remove(currentNamespace);
+		stateTable.remove(currentNamespace);
 	}
 
 	@Override
@@ -137,20 +100,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 		Preconditions.checkState(namespace != null, "No namespace given.");
 		Preconditions.checkState(key != null, "No key given.");
 
-		Map<N, Map<K, SV>> namespaceMap =
-				stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return null;
-		}
-
-		SV result = keyedMap.get(key);
+		SV result = stateTable.get(key, namespace);
 
 		if (result == null) {
 			return null;
@@ -158,30 +108,14 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 
 		@SuppressWarnings("unchecked,rawtypes")
 		TypeSerializer serializer = stateDesc.getSerializer();
-
 		return KvStateRequestSerializer.serializeValue(result, serializer);
 	}
 
 	/**
-	 * Creates a new map for use in Heap based state.
-	 *
-	 * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, this
-	 * will create a concurrent hash map instead of a regular one.
-	 *
-	 * @return A new namespace map.
-	 */
-	protected <MK, MV> Map<MK, MV> createNewMap() {
-		if (stateDesc.isQueryable()) {
-			return new ConcurrentHashMap<>();
-		} else {
-			return new HashMap<>();
-		}
-	}
-
-	/**
 	 * This should only be used for testing.
 	 */
+	@VisibleForTesting
 	public StateTable<K, N, SV> getStateTable() {
 		return stateTable;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
new file mode 100644
index 0000000..b0d7727
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how
+ * the snapshot is written during the serialization phase of checkpointing.
+ */
+@Internal
+abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateTableSnapshot {
+
+	/**
+	 * The {@link StateTable} from which this snapshot was created.
+	 */
+	final T owningStateTable;
+
+	/**
+	 * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table.
+	 *
+	 * @param owningStateTable the {@link StateTable} for which this object represents a snapshot.
+	 */
+	AbstractStateTableSnapshot(T owningStateTable) {
+		this.owningStateTable = Preconditions.checkNotNull(owningStateTable);
+	}
+
+	/**
+	 * Optional hook to release resources for this snapshot at the end of its lifecycle.
+	 */
+	@Override
+	public void release() {
+	}
+}
\ No newline at end of file