You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/16 17:34:26 UTC

[1/4] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend

Repository: flink
Updated Branches:
  refs/heads/master 30bb958a7 -> ab014ef94


http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
new file mode 100644
index 0000000..08896da
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class CopyOnWriteStateTableTest {
+
+	/**
+	 * Testing the basic map operations.
+	 */
+	@Test
+	public void testPutGetRemoveContainsTransform() throws Exception {
+		RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredBackendStateMetaInfo<>(
+						StateDescriptor.Type.UNKNOWN,
+						"test",
+						IntSerializer.INSTANCE,
+						new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+		final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+		final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
+				new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+		ArrayList<Integer> state_1_1 = new ArrayList<>();
+		state_1_1.add(41);
+		ArrayList<Integer> state_2_1 = new ArrayList<>();
+		state_2_1.add(42);
+		ArrayList<Integer> state_1_2 = new ArrayList<>();
+		state_1_2.add(43);
+
+		Assert.assertNull(stateTable.putAndGetOld(1, 1, state_1_1));
+		Assert.assertEquals(state_1_1, stateTable.get(1, 1));
+		Assert.assertEquals(1, stateTable.size());
+
+		Assert.assertNull(stateTable.putAndGetOld(2, 1, state_2_1));
+		Assert.assertEquals(state_2_1, stateTable.get(2, 1));
+		Assert.assertEquals(2, stateTable.size());
+
+		Assert.assertNull(stateTable.putAndGetOld(1, 2, state_1_2));
+		Assert.assertEquals(state_1_2, stateTable.get(1, 2));
+		Assert.assertEquals(3, stateTable.size());
+
+		Assert.assertTrue(stateTable.containsKey(2, 1));
+		Assert.assertFalse(stateTable.containsKey(3, 1));
+		Assert.assertFalse(stateTable.containsKey(2, 3));
+		stateTable.put(2, 1, null);
+		Assert.assertTrue(stateTable.containsKey(2, 1));
+		Assert.assertEquals(3, stateTable.size());
+		Assert.assertNull(stateTable.get(2, 1));
+		stateTable.put(2, 1, state_2_1);
+		Assert.assertEquals(3, stateTable.size());
+
+		Assert.assertEquals(state_2_1, stateTable.removeAndGetOld(2, 1));
+		Assert.assertFalse(stateTable.containsKey(2, 1));
+		Assert.assertEquals(2, stateTable.size());
+
+		stateTable.remove(1, 2);
+		Assert.assertFalse(stateTable.containsKey(1, 2));
+		Assert.assertEquals(1, stateTable.size());
+
+		Assert.assertNull(stateTable.removeAndGetOld(4, 2));
+		Assert.assertEquals(1, stateTable.size());
+
+		StateTransformationFunction<ArrayList<Integer>, Integer> function =
+				new StateTransformationFunction<ArrayList<Integer>, Integer>() {
+					@Override
+					public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+						previousState.add(value);
+						return previousState;
+					}
+				};
+
+		final int value = 4711;
+		stateTable.transform(1, 1, value, function);
+		state_1_1 = function.apply(state_1_1, value);
+		Assert.assertEquals(state_1_1, stateTable.get(1, 1));
+	}
+
+	/**
+	 * This test triggers incremental rehash and tests for corruptions.
+	 */
+	@Test
+	public void testIncrementalRehash() {
+		RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredBackendStateMetaInfo<>(
+						StateDescriptor.Type.UNKNOWN,
+						"test",
+						IntSerializer.INSTANCE,
+						new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+		final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+		final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
+				new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+		int insert = 0;
+		int remove = 0;
+		while (!stateTable.isRehashing()) {
+			stateTable.put(insert++, 0, new ArrayList<Integer>());
+			if (insert % 8 == 0) {
+				stateTable.remove(remove++, 0);
+			}
+		}
+		Assert.assertEquals(insert - remove, stateTable.size());
+		while (stateTable.isRehashing()) {
+			stateTable.put(insert++, 0, new ArrayList<Integer>());
+			if (insert % 8 == 0) {
+				stateTable.remove(remove++, 0);
+			}
+		}
+		Assert.assertEquals(insert - remove, stateTable.size());
+
+		for (int i = 0; i < insert; ++i) {
+			if (i < remove) {
+				Assert.assertFalse(stateTable.containsKey(i, 0));
+			} else {
+				Assert.assertTrue(stateTable.containsKey(i, 0));
+			}
+		}
+	}
+
+	/**
+	 * This test does some random modifications to a state table and a reference (hash map). Then draws snapshots,
+	 * performs more modifications and checks snapshot integrity.
+	 */
+	@Test
+	public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception {
+
+		final RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredBackendStateMetaInfo<>(
+						StateDescriptor.Type.UNKNOWN,
+						"test",
+						IntSerializer.INSTANCE,
+						new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+		final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+		final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
+				new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+		final HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> referenceMap = new HashMap<>();
+
+		final Random random = new Random(42);
+
+		// holds snapshots from the map under test
+		CopyOnWriteStateTable.StateTableEntry<Integer, Integer, ArrayList<Integer>>[] snapshot = null;
+		int snapshotSize = 0;
+
+		// holds a reference snapshot from our reference map that we compare against
+		Tuple3<Integer, Integer, ArrayList<Integer>>[] reference = null;
+
+		int val = 0;
+
+
+		int snapshotCounter = 0;
+		int referencedSnapshotId = 0;
+
+		final StateTransformationFunction<ArrayList<Integer>, Integer> transformationFunction =
+				new StateTransformationFunction<ArrayList<Integer>, Integer>() {
+					@Override
+					public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+						if (previousState == null) {
+							previousState = new ArrayList<>();
+						}
+						previousState.add(value);
+						// we give back the original, attempting to spot errors in to copy-on-write
+						return previousState;
+					}
+				};
+
+		// the main loop for modifications
+		for (int i = 0; i < 10_000_000; ++i) {
+
+			int key = random.nextInt(20);
+			int namespace = random.nextInt(4);
+			Tuple2<Integer, Integer> compositeKey = new Tuple2<>(key, namespace);
+
+			int op = random.nextInt(7);
+
+			ArrayList<Integer> state = null;
+			ArrayList<Integer> referenceState = null;
+
+			switch (op) {
+				case 0:
+				case 1: {
+					state = stateTable.get(key, namespace);
+					referenceState = referenceMap.get(compositeKey);
+					if (null == state) {
+						state = new ArrayList<>();
+						stateTable.put(key, namespace, state);
+						referenceState = new ArrayList<>();
+						referenceMap.put(compositeKey, referenceState);
+					}
+					break;
+				}
+				case 2: {
+					stateTable.put(key, namespace, new ArrayList<Integer>());
+					referenceMap.put(compositeKey, new ArrayList<Integer>());
+					break;
+				}
+				case 3: {
+					state = stateTable.putAndGetOld(key, namespace, new ArrayList<Integer>());
+					referenceState = referenceMap.put(compositeKey, new ArrayList<Integer>());
+					break;
+				}
+				case 4: {
+					stateTable.remove(key, namespace);
+					referenceMap.remove(compositeKey);
+					break;
+				}
+				case 5: {
+					state = stateTable.removeAndGetOld(key, namespace);
+					referenceState = referenceMap.remove(compositeKey);
+					break;
+				}
+				case 6: {
+					final int updateValue = random.nextInt(1000);
+					stateTable.transform(key, namespace, updateValue, transformationFunction);
+					referenceMap.put(compositeKey, transformationFunction.apply(
+							referenceMap.remove(compositeKey), updateValue));
+					break;
+				}
+				default: {
+					Assert.fail("Unknown op-code " + op);
+				}
+			}
+
+			Assert.assertEquals(referenceMap.size(), stateTable.size());
+
+			if (state != null) {
+				// mutate the states a bit...
+				if (random.nextBoolean() && !state.isEmpty()) {
+					state.remove(state.size() - 1);
+					referenceState.remove(referenceState.size() - 1);
+				} else {
+					state.add(val);
+					referenceState.add(val);
+					++val;
+				}
+			}
+
+			Assert.assertEquals(referenceState, state);
+
+			// snapshot triggering / comparison / release
+			if (i > 0 && i % 500 == 0) {
+
+				if (snapshot != null) {
+					// check our referenced snapshot
+					deepCheck(reference, convert(snapshot, snapshotSize));
+
+					if (i % 1_000 == 0) {
+						// draw and release some other snapshot while holding on the old snapshot
+						++snapshotCounter;
+						stateTable.snapshotTableArrays();
+						stateTable.releaseSnapshot(snapshotCounter);
+					}
+
+					//release the snapshot after some time
+					if (i % 5_000 == 0) {
+						snapshot = null;
+						reference = null;
+						snapshotSize = 0;
+						stateTable.releaseSnapshot(referencedSnapshotId);
+					}
+
+				} else {
+					// if there is no more referenced snapshot, we create one
+					++snapshotCounter;
+					referencedSnapshotId = snapshotCounter;
+					snapshot = stateTable.snapshotTableArrays();
+					snapshotSize = stateTable.size();
+					reference = manualDeepDump(referenceMap);
+				}
+			}
+		}
+	}
+
+	/**
+	 * This tests for the copy-on-write contracts, e.g. ensures that no copy-on-write is active after all snapshots are
+	 * released.
+	 */
+	@Test
+	public void testCopyOnWriteContracts() {
+		RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredBackendStateMetaInfo<>(
+						StateDescriptor.Type.UNKNOWN,
+						"test",
+						IntSerializer.INSTANCE,
+						new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+		final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+		final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
+				new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+		ArrayList<Integer> originalState1 = new ArrayList<>(1);
+		ArrayList<Integer> originalState2 = new ArrayList<>(1);
+		ArrayList<Integer> originalState3 = new ArrayList<>(1);
+		ArrayList<Integer> originalState4 = new ArrayList<>(1);
+		ArrayList<Integer> originalState5 = new ArrayList<>(1);
+
+		originalState1.add(1);
+		originalState2.add(2);
+		originalState3.add(3);
+		originalState4.add(4);
+		originalState5.add(5);
+
+		stateTable.put(1, 1, originalState1);
+		stateTable.put(2, 1, originalState2);
+		stateTable.put(4, 1, originalState4);
+		stateTable.put(5, 1, originalState5);
+
+		// no snapshot taken, we get the original back
+		Assert.assertTrue(stateTable.get(1, 1) == originalState1);
+		CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot1 = stateTable.createSnapshot();
+		// after snapshot1 is taken, we get a copy...
+		final ArrayList<Integer> copyState = stateTable.get(1, 1);
+		Assert.assertFalse(copyState == originalState1);
+		// ...and the copy is equal
+		Assert.assertEquals(originalState1, copyState);
+
+		// we make an insert AFTER snapshot1
+		stateTable.put(3, 1, originalState3);
+
+		// on repeated lookups, we get the same copy because no further snapshot was taken
+		Assert.assertTrue(copyState == stateTable.get(1, 1));
+
+		// we take snapshot2
+		CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot2 = stateTable.createSnapshot();
+		// after the second snapshot, copy-on-write is active again for old entries
+		Assert.assertFalse(copyState == stateTable.get(1, 1));
+		// and equality still holds
+		Assert.assertEquals(copyState, stateTable.get(1, 1));
+
+		// after releasing snapshot2
+		stateTable.releaseSnapshot(snapshot2);
+		// we still get the original of the untouched late insert (after snapshot1)
+		Assert.assertTrue(originalState3 == stateTable.get(3, 1));
+		// but copy-on-write is still active for older inserts (before snapshot1)
+		Assert.assertFalse(originalState4 == stateTable.get(4, 1));
+
+		// after releasing snapshot1
+		stateTable.releaseSnapshot(snapshot1);
+		// no copy-on-write is active
+		Assert.assertTrue(originalState5 == stateTable.get(5, 1));
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <K, N, S> Tuple3<K, N, S>[] convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshot, int mapSize) {
+
+		Tuple3<K, N, S>[] result = new Tuple3[mapSize];
+		int pos = 0;
+		for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : snapshot) {
+			while (null != entry) {
+				result[pos++] = new Tuple3<>(entry.getKey(), entry.getNamespace(), entry.getState());
+				entry = entry.next;
+			}
+		}
+		Assert.assertEquals(mapSize, pos);
+		return result;
+	}
+
+	@SuppressWarnings("unchecked")
+	private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump(
+			HashMap<Tuple2<Integer, Integer>,
+					ArrayList<Integer>> map) {
+
+		Tuple3<Integer, Integer, ArrayList<Integer>>[] result = new Tuple3[map.size()];
+		int pos = 0;
+		for (Map.Entry<Tuple2<Integer, Integer>, ArrayList<Integer>> entry : map.entrySet()) {
+			Integer key = entry.getKey().f0;
+			Integer namespace = entry.getKey().f1;
+			result[pos++] = new Tuple3<>(key, namespace, new ArrayList<>(entry.getValue()));
+		}
+		return result;
+	}
+
+	private void deepCheck(
+			Tuple3<Integer, Integer, ArrayList<Integer>>[] a,
+			Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
+
+		if (a == b) {
+			return;
+		}
+
+		Assert.assertEquals(a.length, b.length);
+
+		Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> comparator =
+				new Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>>() {
+
+					@Override
+					public int compare(Tuple3<Integer, Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> o2) {
+						int namespaceDiff = o1.f1 - o2.f1;
+						return namespaceDiff != 0 ? namespaceDiff : o1.f0 - o2.f0;
+					}
+				};
+
+		Arrays.sort(a, comparator);
+		Arrays.sort(b, comparator);
+
+		for (int i = 0; i < a.length; ++i) {
+			Tuple3<Integer, Integer, ArrayList<Integer>> av = a[i];
+			Tuple3<Integer, Integer, ArrayList<Integer>> bv = b[i];
+
+			Assert.assertEquals(av.f0, bv.f0);
+			Assert.assertEquals(av.f1, bv.f1);
+			Assert.assertEquals(av.f2, bv.f2);
+		}
+	}
+
+	static class MockInternalKeyContext<T> implements InternalKeyContext<T> {
+
+		private T key;
+		private final TypeSerializer<T> serializer;
+		private final KeyGroupRange keyGroupRange;
+
+		public MockInternalKeyContext(TypeSerializer<T> serializer) {
+			this.serializer = serializer;
+			this.keyGroupRange = new KeyGroupRange(0, 0);
+		}
+
+		public void setKey(T key) {
+			this.key = key;
+		}
+
+		@Override
+		public T getCurrentKey() {
+			return key;
+		}
+
+		@Override
+		public int getCurrentKeyGroupIndex() {
+			return 0;
+		}
+
+		@Override
+		public int getNumberOfKeyGroups() {
+			return 1;
+		}
+
+		@Override
+		public KeyGroupRange getKeyGroupRange() {
+			return keyGroupRange;
+		}
+
+		@Override
+		public TypeSerializer<T> getKeySerializer() {
+			return serializer;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
index 735b5f5..cb4e403 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
@@ -23,25 +23,20 @@ import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-
 import org.junit.Test;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the simple Java heap objects implementation of the {@link AggregatingState}.
  */
-public class HeapAggregatingStateTest {
+public class HeapAggregatingStateTest extends HeapStateBackendTestBase {
 
 	@Test
 	public void testAddAndGet() throws Exception {
@@ -227,20 +222,6 @@ public class HeapAggregatingStateTest {
 	}
 
 	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
-		return new HeapKeyedStateBackend<>(
-				mock(TaskKvStateRegistry.class),
-				StringSerializer.INSTANCE,
-				HeapAggregatingStateTest.class.getClassLoader(),
-				16,
-				new KeyGroupRange(0, 15),
-				new ExecutionConfig());
-	}
-
-	// ------------------------------------------------------------------------
 	//  test functions
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
new file mode 100644
index 0000000..da0666a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.Collections;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests backwards compatibility in the serialization format of heap-based KeyedStateBackends.
+ */
+public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackendTestBase {
+
+	/**
+	 * [FLINK-5979]
+	 *
+	 * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check
+	 * the backwards compatibility of the serialization format of {@link StateTable}s.
+	 */
+	@Test
+	public void testRestore1_2ToMaster() throws Exception {
+
+		ClassLoader cl = getClass().getClassLoader();
+		URL resource = cl.getResource("heap_keyed_statebackend_1_2.snapshot");
+
+		Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend()) {
+			final KeyGroupsStateHandle stateHandle;
+			try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
+				stateHandle = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
+			}
+			keyedBackend.restore(Collections.singleton(stateHandle));
+			final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+			stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+
+			assertEquals(7, keyedBackend.numStateEntries());
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			assertEquals(asList(33L, 55L), state.get());
+			state.setCurrentNamespace(namespace2);
+			assertEquals(asList(22L, 11L), state.get());
+			state.setCurrentNamespace(namespace3);
+			assertEquals(Collections.singletonList(44L), state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			assertEquals(asList(11L, 44L), state.get());
+
+			state.setCurrentNamespace(namespace3);
+			assertEquals(asList(22L, 55L, 33L), state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get());
+		}
+	}
+
+//	/**
+//	 * This code was used to create the binary file of the old version's snapshot used by this test. If you need to
+//	 * recreate the binary, you can comment this out and run it.
+//	 */
+//	private void createBinarySnapshot() throws Exception {
+//
+//		final String pathToWrite = "/PATH/TO/WRITE";
+//
+//		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+//		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+//
+//		final Integer namespace1 = 1;
+//		final Integer namespace2 = 2;
+//		final Integer namespace3 = 3;
+//
+//		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+//
+//		try {
+//			InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+//
+//			keyedBackend.setCurrentKey("abc");
+//			state.setCurrentNamespace(namespace1);
+//			state.add(33L);
+//			state.add(55L);
+//
+//			state.setCurrentNamespace(namespace2);
+//			state.add(22L);
+//			state.add(11L);
+//
+//			state.setCurrentNamespace(namespace3);
+//			state.add(44L);
+//
+//			keyedBackend.setCurrentKey("def");
+//			state.setCurrentNamespace(namespace1);
+//			state.add(11L);
+//			state.add(44L);
+//
+//			state.setCurrentNamespace(namespace3);
+//			state.add(22L);
+//			state.add(55L);
+//			state.add(33L);
+//
+//			keyedBackend.setCurrentKey("jkl");
+//			state.setCurrentNamespace(namespace1);
+//			state.add(11L);
+//			state.add(22L);
+//			state.add(33L);
+//			state.add(44L);
+//			state.add(55L);
+//
+//			keyedBackend.setCurrentKey("mno");
+//			state.setCurrentNamespace(namespace3);
+//			state.add(11L);
+//			state.add(22L);
+//			state.add(33L);
+//			state.add(44L);
+//			state.add(55L);
+//			RunnableFuture<KeyGroupsStateHandle> snapshot = keyedBackend.snapshot(
+//					0L,
+//					0L,
+//					new MemCheckpointStreamFactory(4 * 1024 * 1024),
+//					CheckpointOptions.forFullCheckpoint());
+//
+//			snapshot.run();
+//
+//			try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite))) {
+//				InstantiationUtil.serializeObject(bis, snapshot.get());
+//			}
+//
+//		} finally {
+//			keyedBackend.close();
+//			keyedBackend.dispose();
+//		}
+//	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index c36a48b..7705c19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -22,13 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
-
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -39,12 +35,11 @@ import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the simple Java heap objects implementation of the {@link ListState}.
  */
-public class HeapListStateTest {
+public class HeapListStateTest extends HeapStateBackendTestBase {
 
 	@Test
 	public void testAddAndGet() throws Exception {
@@ -225,16 +220,6 @@ public class HeapListStateTest {
 			keyedBackend.dispose();
 		}
 	}
-
-	private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
-		return new HeapKeyedStateBackend<>(
-				mock(TaskKvStateRegistry.class),
-				StringSerializer.INSTANCE,
-				HeapListStateTest.class.getClassLoader(),
-				16,
-				new KeyGroupRange(0, 15),
-				new ExecutionConfig());
-	}
 	
 	private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
 		int num = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
index 63eec04..928eaec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -23,25 +23,20 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
-
 import org.junit.Test;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the simple Java heap objects implementation of the {@link ReducingState}.
  */
-public class HeapReducingStateTest {
+public class HeapReducingStateTest extends HeapStateBackendTestBase {
 
 	@Test
 	public void testAddAndGet() throws Exception {
@@ -214,7 +209,7 @@ public class HeapReducingStateTest {
 			keyedBackend.setCurrentKey("mno");
 			state.setCurrentNamespace(namespace1);
 			state.clear();
-			
+
 			StateTable<String, Integer, Long> stateTable =
 					((HeapReducingState<String, Integer, Long>) state).stateTable;
 
@@ -227,20 +222,6 @@ public class HeapReducingStateTest {
 	}
 
 	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
-		return new HeapKeyedStateBackend<>(
-				mock(TaskKvStateRegistry.class),
-				StringSerializer.INSTANCE,
-				HeapReducingStateTest.class.getClassLoader(),
-				16,
-				new KeyGroupRange(0, 15),
-				new ExecutionConfig());
-	}
-
-	// ------------------------------------------------------------------------
 	//  test functions
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
new file mode 100644
index 0000000..e6adef8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+@RunWith(Parameterized.class)
+public abstract class HeapStateBackendTestBase {
+
+	@Parameterized.Parameters
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(false, true);
+	}
+
+	@Parameterized.Parameter
+	public boolean async;
+
+	public HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+		return new HeapKeyedStateBackend<>(
+				mock(TaskKvStateRegistry.class),
+				StringSerializer.INSTANCE,
+				HeapReducingStateTest.class.getClassLoader(),
+				16,
+				new KeyGroupRange(0, 15),
+				async,
+				new ExecutionConfig());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
new file mode 100644
index 0000000..6fd94f7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+public class StateTableSnapshotCompatibilityTest {
+
+	/**
+	 * This test ensures that different implementations of {@link StateTable} are compatible in their serialization
+	 * format.
+	 */
+	@Test
+	public void checkCompatibleSerializationFormats() throws IOException {
+		final Random r = new Random(42);
+		RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredBackendStateMetaInfo<>(
+						StateDescriptor.Type.UNKNOWN,
+						"test",
+						IntSerializer.INSTANCE,
+						new ArrayListSerializer<>(IntSerializer.INSTANCE));
+
+		final CopyOnWriteStateTableTest.MockInternalKeyContext<Integer> keyContext =
+				new CopyOnWriteStateTableTest.MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+		CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> cowStateTable =
+				new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+		for (int i = 0; i < 100; ++i) {
+			ArrayList<Integer> list = new ArrayList<>(5);
+			int end = r.nextInt(5);
+			for (int j = 0; j < end; ++j) {
+				list.add(r.nextInt(100));
+			}
+
+			cowStateTable.put(r.nextInt(10), r.nextInt(2), list);
+		}
+
+		StateTableSnapshot snapshot = cowStateTable.createSnapshot();
+
+		final NestedMapsStateTable<Integer, Integer, ArrayList<Integer>> nestedMapsStateTable =
+				new NestedMapsStateTable<>(keyContext, metaInfo);
+
+		restoreStateTableFromSnapshot(nestedMapsStateTable, snapshot, keyContext.getKeyGroupRange());
+		snapshot.release();
+
+
+		Assert.assertEquals(cowStateTable.size(), nestedMapsStateTable.size());
+		for (StateEntry<Integer, Integer, ArrayList<Integer>> entry : cowStateTable) {
+			Assert.assertEquals(entry.getState(), nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()));
+		}
+
+		snapshot = nestedMapsStateTable.createSnapshot();
+		cowStateTable = new CopyOnWriteStateTable<>(keyContext, metaInfo);
+
+		restoreStateTableFromSnapshot(cowStateTable, snapshot, keyContext.getKeyGroupRange());
+		snapshot.release();
+
+		Assert.assertEquals(nestedMapsStateTable.size(), cowStateTable.size());
+		for (StateEntry<Integer, Integer, ArrayList<Integer>> entry : cowStateTable) {
+			Assert.assertEquals(nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()), entry.getState());
+		}
+	}
+
+	private static <K, N, S> void restoreStateTableFromSnapshot(
+			StateTable<K, N, S> stateTable,
+			StateTableSnapshot snapshot,
+			KeyGroupRange keyGroupRange) throws IOException {
+
+		final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(1024 * 1024);
+		final DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
+
+		for (Integer keyGroup : keyGroupRange) {
+			snapshot.writeMappingsInKeyGroup(dov, keyGroup);
+		}
+
+		final ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(out.getBuf());
+		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
+
+		final StateTableByKeyGroupReader keyGroupReader =
+				StateTableByKeyGroupReaders.readerForVersion(stateTable, KeyedBackendSerializationProxy.VERSION);
+
+		for (Integer keyGroup : keyGroupRange) {
+			keyGroupReader.readMappingsInKeyGroup(div, keyGroup);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
new file mode 100644
index 0000000..291f3ed
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO
+ */
+@VisibleForTesting
+@Internal
+public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
+
+	private final int maxSize;
+	private volatile int afterNumberInvocations;
+	private volatile OneShotLatch blocker;
+	private volatile OneShotLatch waiter;
+
+	MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
+
+	public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
+		return lastCreatedStream;
+	}
+
+	public BlockerCheckpointStreamFactory(int maxSize) {
+		this.maxSize = maxSize;
+	}
+
+	public void setAfterNumberInvocations(int afterNumberInvocations) {
+		this.afterNumberInvocations = afterNumberInvocations;
+	}
+
+	public void setBlockerLatch(OneShotLatch latch) {
+		this.blocker = latch;
+	}
+
+	public void setWaiterLatch(OneShotLatch latch) {
+		this.waiter = latch;
+	}
+
+	@Override
+	public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+		this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
+
+			private int afterNInvocations = afterNumberInvocations;
+			private final OneShotLatch streamBlocker = blocker;
+			private final OneShotLatch streamWaiter = waiter;
+
+			@Override
+			public void write(int b) throws IOException {
+
+				if (null != waiter) {
+					waiter.trigger();
+				}
+
+				if (afterNInvocations > 0) {
+					--afterNInvocations;
+				}
+
+				if (0 == afterNInvocations && null != streamBlocker) {
+					try {
+						streamBlocker.await();
+					} catch (InterruptedException ignored) {
+					}
+				}
+				try {
+					super.write(b);
+				} catch (IOException ex) {
+					if (null != streamWaiter) {
+						streamWaiter.trigger();
+					}
+					throw ex;
+				}
+
+				if (0 == afterNInvocations && null != streamWaiter) {
+					streamWaiter.trigger();
+				}
+			}
+
+			@Override
+			public void close() {
+				super.close();
+				if (null != streamWaiter) {
+					streamWaiter.trigger();
+				}
+			}
+		};
+
+		return lastCreatedStream;
+	}
+
+	@Override
+	public void close() throws Exception {
+
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
new file mode 100644
index 0000000..b9171bc
Binary files /dev/null and b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 05c89dd..781c320 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.util.MathUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -98,9 +99,7 @@ public class TimeWindow extends Window {
 
 	@Override
 	public int hashCode() {
-		int result = (int) (start ^ (start >>> 32));
-		result = 31 * result + (int) (end ^ (end >>> 32));
-		return result;
+		return MathUtils.longToIntWithBitMixing(start + end);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 1911f44..5e966d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -91,7 +91,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 	}
 
 	enum StateBackendEnum {
-		MEM, FILE, ROCKSDB_FULLY_ASYNC
+		MEM, FILE, ROCKSDB_FULLY_ASYNC, MEM_ASYNC, FILE_ASYNC
 	}
 
 	@BeforeClass
@@ -116,11 +116,19 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 	public void initStateBackend() throws IOException {
 		switch (stateBackendEnum) {
 			case MEM:
-				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
+				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
 				break;
 			case FILE: {
 				String backups = tempFolder.newFolder().getAbsolutePath();
-				this.stateBackend = new FsStateBackend("file://" + backups);
+				this.stateBackend = new FsStateBackend("file://" + backups, false);
+				break;
+			}
+			case MEM_ASYNC:
+				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
+				break;
+			case FILE_ASYNC: {
+				String backups = tempFolder.newFolder().getAbsolutePath();
+				this.stateBackend = new FsStateBackend("file://" + backups, true);
 				break;
 			}
 			case ROCKSDB_FULLY_ASYNC: {
@@ -138,9 +146,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	@Test
 	public void testTumblingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 100;
-		final int NUM_KEYS = 100;
+		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+		final int WINDOW_SIZE = windowSize();
+		final int NUM_KEYS = numKeys();
 		FailingSource.reset();
 		
 		try {
@@ -211,9 +219,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 	}
 
 	public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 100;
-		final int NUM_KEYS = 100;
+		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+		final int WINDOW_SIZE = windowSize();
+		final int NUM_KEYS = numKeys();
 		FailingSource.reset();
 
 		try {
@@ -280,10 +288,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	@Test
 	public void testSlidingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 1000;
-		final int WINDOW_SLIDE = 100;
-		final int NUM_KEYS = 100;
+		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+		final int WINDOW_SIZE = windowSize();
+		final int WINDOW_SLIDE = windowSlide();
+		final int NUM_KEYS = numKeys();
 		FailingSource.reset();
 
 		try {
@@ -346,9 +354,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	@Test
 	public void testPreAggregatedTumblingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 100;
-		final int NUM_KEYS = 100;
+		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+		final int WINDOW_SIZE = windowSize();
+		final int NUM_KEYS = numKeys();
 		FailingSource.reset();
 
 		try {
@@ -418,10 +426,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	@Test
 	public void testPreAggregatedSlidingTimeWindow() {
-		final int NUM_ELEMENTS_PER_KEY = 3000;
-		final int WINDOW_SIZE = 1000;
-		final int WINDOW_SLIDE = 100;
-		final int NUM_KEYS = 100;
+		final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+		final int WINDOW_SIZE = windowSize();
+		final int WINDOW_SLIDE = windowSlide();
+		final int NUM_KEYS = numKeys();
 		FailingSource.reset();
 
 		try {
@@ -790,4 +798,20 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 		public IntType(int value) { this.value = value; }
 	}
+
+	protected int numElementsPerKey() {
+		return 300;
+	}
+
+	protected int windowSize() {
+		return 100;
+	}
+
+	protected int windowSlide() {
+		return 100;
+	}
+
+	protected int numKeys() {
+		return 20;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..a5bf10c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+	public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
+		super(StateBackendEnum.FILE_ASYNC);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..ef9ad37
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+	public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
+		super(StateBackendEnum.MEM_ASYNC);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index 14feb78..da2bbc7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -23,4 +23,24 @@ public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEv
 	public RocksDbBackendEventTimeWindowCheckpointingITCase() {
 		super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
 	}
+
+	@Override
+	protected int numElementsPerKey() {
+		return 3000;
+	}
+
+	@Override
+	protected int windowSize() {
+		return 1000;
+	}
+
+	@Override
+	protected int windowSlide() {
+		return 100;
+	}
+
+	@Override
+	protected int numKeys() {
+		return 100;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
index 456861a..cbb56d0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -33,6 +33,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.util.Random;
+
 /**
  * A collection of manual tests that serve to assess the performance of windowed operations. These
  * run in local mode with parallelism 1 with a source that emits data as fast as possible. Thus,
@@ -241,11 +243,10 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
 
 		@Override
 		public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception {
-			long index = 0;
+			Random random = new Random(42);
 			while (running) {
-				Tuple2<String, Integer> tuple = new Tuple2<String, Integer>("Tuple " + (index % numKeys), 1);
+				Tuple2<String, Integer> tuple = new Tuple2<String, Integer>("Tuple " + (random.nextInt(numKeys)), 1);
 				out.collect(tuple);
-				index++;
 			}
 		}
 


[4/4] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend

Posted by sr...@apache.org.
[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab014ef9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab014ef9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab014ef9

Branch: refs/heads/master
Commit: ab014ef94e0e9137ac6f8f41dae385ff71e8ba5b
Parents: 30bb958
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Mar 3 10:51:15 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Mar 16 18:34:02 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |    5 +
 .../state/RocksDBStateBackendTest.java          |   86 +-
 .../java/org/apache/flink/util/MathUtils.java   |   35 +-
 .../filesystem/AbstractFsStateSnapshot.java     |   30 +-
 .../state/memory/AbstractMemStateSnapshot.java  |   40 +-
 .../AbstractMigrationRestoreStrategy.java       |  117 ++
 .../state/memory/MigrationRestoreSnapshot.java  |   32 +
 .../state/AbstractKeyedStateBackend.java        |    7 +
 .../runtime/state/ArrayListSerializer.java      |   14 +-
 .../state/KeyedBackendSerializationProxy.java   |   23 +-
 .../flink/runtime/state/KeyedStateBackend.java  |   28 +-
 .../state/StateTransformationFunction.java      |   23 +
 .../state/filesystem/FsStateBackend.java        |  109 +-
 .../state/heap/AbstractHeapMergingState.java    |   93 +-
 .../runtime/state/heap/AbstractHeapState.java   |   84 +-
 .../state/heap/AbstractStateTableSnapshot.java  |   51 +
 .../state/heap/CopyOnWriteStateTable.java       | 1066 ++++++++++++++++++
 .../heap/CopyOnWriteStateTableSnapshot.java     |  188 +++
 .../state/heap/HeapAggregatingState.java        |   92 +-
 .../runtime/state/heap/HeapFoldingState.java    |   70 +-
 .../state/heap/HeapKeyedStateBackend.java       |  426 +++----
 .../flink/runtime/state/heap/HeapListState.java |   63 +-
 .../flink/runtime/state/heap/HeapMapState.java  |  167 +--
 .../runtime/state/heap/HeapReducingState.java   |   82 +-
 .../runtime/state/heap/HeapValueState.java      |   47 +-
 .../runtime/state/heap/InternalKeyContext.java  |   60 +
 .../state/heap/NestedMapsStateTable.java        |  363 ++++++
 .../flink/runtime/state/heap/StateEntry.java    |   44 +
 .../flink/runtime/state/heap/StateTable.java    |  222 ++--
 .../state/heap/StateTableByKeyGroupReader.java  |   38 +
 .../state/heap/StateTableByKeyGroupReaders.java |  136 +++
 .../runtime/state/heap/StateTableSnapshot.java  |   45 +
 .../state/memory/MemoryStateBackend.java        |   26 +
 .../runtime/query/QueryableStateClientTest.java |    7 +-
 .../message/KvStateRequestSerializerTest.java   |   38 +-
 .../state/AsyncFileStateBackendTest.java        |   27 +
 .../state/AsyncMemoryStateBackendTest.java      |   27 +
 .../runtime/state/FileStateBackendTest.java     |    8 +-
 .../runtime/state/MemoryStateBackendTest.java   |    8 +-
 .../runtime/state/StateBackendTestBase.java     |  216 +++-
 .../state/heap/CopyOnWriteStateTableTest.java   |  486 ++++++++
 .../state/heap/HeapAggregatingStateTest.java    |   21 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |  173 +++
 .../runtime/state/heap/HeapListStateTest.java   |   17 +-
 .../state/heap/HeapReducingStateTest.java       |   23 +-
 .../state/heap/HeapStateBackendTestBase.java    |   54 +
 .../StateTableSnapshotCompatibilityTest.java    |  118 ++
 .../util/BlockerCheckpointStreamFactory.java    |  118 ++
 .../heap_keyed_statebackend_1_2.snapshot        |  Bin 0 -> 2068 bytes
 .../api/windowing/windows/TimeWindow.java       |    5 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   64 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |   20 +
 .../test/state/ManualWindowSpeedITCase.java     |    7 +-
 55 files changed, 4239 insertions(+), 1162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index aaccc2f..f585d21 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1219,4 +1219,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// expected
 		}
 	}
+
+	@Override
+	public boolean supportsAsynchronousSnapshots() {
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index c7b5c20..708613b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -31,14 +31,13 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -364,89 +363,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		assertEquals(null, keyedStateBackend.db);
 	}
 
-	static class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
-
-		private final int maxSize;
-		private int afterNumberInvocations;
-		private OneShotLatch blocker;
-		private OneShotLatch waiter;
-
-		MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
-
-		public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
-			return lastCreatedStream;
-		}
-
-		public BlockerCheckpointStreamFactory(int maxSize) {
-			this.maxSize = maxSize;
-		}
-
-		public void setAfterNumberInvocations(int afterNumberInvocations) {
-			this.afterNumberInvocations = afterNumberInvocations;
-		}
-
-		public void setBlockerLatch(OneShotLatch latch) {
-			this.blocker = latch;
-		}
-
-		public void setWaiterLatch(OneShotLatch latch) {
-			this.waiter = latch;
-		}
-
-		@Override
-		public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
-			waiter.trigger();
-			this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
-				private int afterNInvocations = afterNumberInvocations;
-				private final OneShotLatch streamBlocker = blocker;
-				private final OneShotLatch streamWaiter = waiter;
-
-				@Override
-				public void write(int b) throws IOException {
-
-					if (afterNInvocations > 0) {
-						--afterNInvocations;
-					}
-
-					if (0 == afterNInvocations && null != streamBlocker) {
-						try {
-							streamBlocker.await();
-						} catch (InterruptedException ignored) {
-						}
-					}
-					try {
-						super.write(b);
-					} catch (IOException ex) {
-						if (null != streamWaiter) {
-							streamWaiter.trigger();
-						}
-						throw ex;
-					}
-
-					if (0 == afterNInvocations && null != streamWaiter) {
-						streamWaiter.trigger();
-					}
-				}
-
-				@Override
-				public void close() {
-					super.close();
-					if (null != streamWaiter) {
-						streamWaiter.trigger();
-					}
-				}
-			};
-
-			return lastCreatedStream;
-		}
-
-		@Override
-		public void close() throws Exception {
-
-		}
-	}
-
 	private static class AcceptAllFilter implements IOFileFilter {
 		@Override
 		public boolean accept(File file) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 074e8ae..1d84a39 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -140,11 +140,7 @@ public final class MathUtils {
 		code = code * 5 + 0xe6546b64;
 
 		code ^= 4;
-		code ^= code >>> 16;
-		code *= 0x85ebca6b;
-		code ^= code >>> 13;
-		code *= 0xc2b2ae35;
-		code ^= code >>> 16;
+		code = bitMix(code);
 
 		if (code >= 0) {
 			return code;
@@ -172,6 +168,35 @@ public final class MathUtils {
 		return x + 1;
 	}
 
+	/**
+	 * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution.
+	 *
+	 * @param in the long (64-bit)input.
+	 * @return the bit-mixed int (32-bit) output
+	 */
+	public static int longToIntWithBitMixing(long in) {
+		in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
+		in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
+		in = in ^ (in >>> 31);
+		return (int) in;
+	}
+
+	/**
+	 * Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is
+	 * from Murmur's 32 bit finalizer.
+	 *
+	 * @param in the input value
+	 * @return the bit-mixed output value
+	 */
+	public static int bitMix(int in) {
+		in ^= in >>> 16;
+		in *= 0x85ebca6b;
+		in ^= in >>> 13;
+		in *= 0xc2b2ae35;
+		in ^= in >>> 16;
+		return in;
+	}
+
 	// ============================================================================================
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 103c214..a15e49d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -21,8 +21,16 @@ package org.apache.flink.migration.runtime.state.filesystem;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.AbstractMigrationRestoreStrategy;
+import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
 
 import java.io.IOException;
 
@@ -36,7 +44,7 @@ import java.io.IOException;
 @Deprecated
 @SuppressWarnings("deprecation")
 public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
-		extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD> {
+		extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -85,4 +93,24 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte
 	public SD getStateDesc() {
 		return stateDesc;
 	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public StateTable<K, N, SV> deserialize(
+			String stateName,
+			HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+		final FileSystem fs = getFilePath().getFileSystem();
+		try (FSDataInputStream inStream = fs.open(getFilePath())) {
+			final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
+			AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+					new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+						@Override
+						protected DataInputView openDataInputView() throws IOException {
+							return inView;
+						}
+					};
+			return restoreStrategy.deserialize(stateName, stateBackend);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
index 6056578..ff86f7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -21,17 +21,18 @@ package org.apache.flink.migration.runtime.state.memory;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 @Deprecated
 @SuppressWarnings("deprecation")
 public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
-		implements KvStateSnapshot<K, N, S, SD> {
+		implements KvStateSnapshot<K, N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -73,24 +74,21 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD ext
 		this.data = data;
 	}
 
-	public HashMap<N, Map<K, SV>> deserialize() throws IOException {
-		DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
-
-		final int numKeys = inView.readInt();
-		HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
-
-		for (int i = 0; i < numKeys && !closed; i++) {
-			N namespace = namespaceSerializer.deserialize(inView);
-			final int numValues = inView.readInt();
-			Map<K, SV> namespaceMap = new HashMap<>(numValues);
-			stateMap.put(namespace, namespaceMap);
-			for (int j = 0; j < numValues; j++) {
-				K key = keySerializer.deserialize(inView);
-				SV value = stateSerializer.deserialize(inView);
-				namespaceMap.put(key, value);
-			}
-		}
-		return stateMap;
+	@Override
+	@SuppressWarnings("unchecked")
+	public StateTable<K, N, SV> deserialize(
+			String stateName,
+			HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+		final DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
+		AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+				new AbstractMigrationRestoreStrategy<K, N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+					@Override
+					protected DataInputView openDataInputView() throws IOException {
+						return inView;
+					}
+				};
+		return restoreStrategy.deserialize(stateName, stateBackend);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
new file mode 100644
index 0000000..e572619
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class outlines the general strategy to restore from migration states.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+@Deprecated
+public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements MigrationRestoreSnapshot<K, N, S> {
+
+	/**
+	 * Key Serializer
+	 */
+	protected final TypeSerializer<K> keySerializer;
+
+	/**
+	 * Namespace Serializer
+	 */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/**
+	 * Serializer for the state value
+	 */
+	protected final TypeSerializer<S> stateSerializer;
+
+	public AbstractMigrationRestoreStrategy(
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<S> stateSerializer) {
+
+		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+		this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+	}
+
+	@Override
+	public StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+		Preconditions.checkNotNull(stateName, "State name is null. Cannot deserialize snapshot.");
+		Preconditions.checkNotNull(stateBackend, "State backend is null. Cannot deserialize snapshot.");
+
+		final KeyGroupRange keyGroupRange = stateBackend.getKeyGroupRange();
+		Preconditions.checkState(1 == keyGroupRange.getNumberOfKeyGroups(),
+				"Unexpected number of key-groups for restoring from Flink 1.1");
+
+		TypeSerializer<N> patchedNamespaceSerializer = this.namespaceSerializer;
+
+		if (patchedNamespaceSerializer instanceof VoidSerializer) {
+			patchedNamespaceSerializer = (TypeSerializer<N>) VoidNamespaceSerializer.INSTANCE;
+		}
+
+		RegisteredBackendStateMetaInfo<N, S> registeredBackendStateMetaInfo =
+				new RegisteredBackendStateMetaInfo<>(
+						StateDescriptor.Type.UNKNOWN,
+						stateName,
+						patchedNamespaceSerializer,
+						stateSerializer);
+
+		final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredBackendStateMetaInfo);
+		final DataInputView inView = openDataInputView();
+		final int keyGroup = keyGroupRange.getStartKeyGroup();
+		final int numNamespaces = inView.readInt();
+
+		for (int i = 0; i < numNamespaces; i++) {
+			N namespace = namespaceSerializer.deserialize(inView);
+			if (null == namespace) {
+				namespace = (N) VoidNamespace.INSTANCE;
+			}
+			final int numKV = inView.readInt();
+			for (int j = 0; j < numKV; j++) {
+				K key = keySerializer.deserialize(inView);
+				S value = stateSerializer.deserialize(inView);
+				stateTable.put(key, keyGroup, namespace, value);
+			}
+		}
+		return stateTable;
+	}
+
+	/**
+	 * Different state handles require different code to end up with a {@link DataInputView}.
+	 */
+	protected abstract DataInputView openDataInputView() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
new file mode 100644
index 0000000..ea529db
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Migration;
+
+import java.io.IOException;
+
+@Deprecated
+@Internal
+public interface MigrationRestoreSnapshot<K, N, S> extends Migration {
+	StateTable<K, N, S> deserialize(String stateName, HeapKeyedStateBackend<K> stateBackend) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index aba00f3..1f2f4a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -254,6 +255,7 @@ public abstract class AbstractKeyedStateBackend<K>
 	/**
 	 * @see KeyedStateBackend
 	 */
+	@Override
 	public KeyGroupRange getKeyGroupRange() {
 		return keyGroupRange;
 	}
@@ -382,4 +384,9 @@ public abstract class AbstractKeyedStateBackend<K>
 	public void close() throws IOException {
 		cancelStreamRegistry.close();
 	}
+
+	@VisibleForTesting
+	public boolean supportsAsynchronousSnapshots() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index f5a6405..0badb41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -57,11 +57,17 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 
 	@Override
 	public ArrayList<T> copy(ArrayList<T> from) {
-		ArrayList<T> newList = new ArrayList<>(from.size());
-		for (int i = 0; i < from.size(); i++) {
-			newList.add(elementSerializer.copy(from.get(i)));
+		if (elementSerializer.isImmutableType()) {
+			// fast track using memcopy for immutable types
+			return new ArrayList<>(from);
+		} else {
+			// element-wise deep copy for mutable types
+			ArrayList<T> newList = new ArrayList<>(from.size());
+			for (int i = 0; i < from.size(); i++) {
+				newList.add(elementSerializer.copy(from.get(i)));
+			}
+			return newList;
 		}
-		return newList;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index dbee6cb..5661c38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -37,11 +38,12 @@ import java.util.List;
  */
 public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
 
-	private static final int VERSION = 1;
+	public static final int VERSION = 2;
 
 	private TypeSerializerSerializationProxy<?> keySerializerProxy;
 	private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
 
+	private int restoredVersion;
 	private ClassLoader userCodeClassLoader;
 
 	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
@@ -51,6 +53,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
 		this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
 		this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+		this.restoredVersion = VERSION;
 		Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
 	}
 
@@ -67,6 +70,22 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		return VERSION;
 	}
 
+	public int getRestoredVersion() {
+		return restoredVersion;
+	}
+
+	@Override
+	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+		super.resolveVersionRead(foundVersion);
+		this.restoredVersion = foundVersion;
+	}
+
+	@Override
+	public boolean isCompatibleVersion(int version) {
+		// we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
+		return super.isCompatibleVersion(version) || version == 1;
+	}
+
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
@@ -96,7 +115,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		}
 	}
 
-//----------------------------------------------------------------------------------------------------------------------
+	//----------------------------------------------------------------------------------------------------------------------
 
 	/**
 	 * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 15e0491..09e27e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -21,13 +21,14 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
 /**
  * A keyed state backend provides methods for managing keyed state.
  *
  * @param <K> The key by which state is keyed.
  */
-public interface KeyedStateBackend<K> {
+public interface KeyedStateBackend<K> extends InternalKeyContext<K> {
 
 	/**
 	 * Sets the current key that is used for partitioned state.
@@ -36,31 +37,6 @@ public interface KeyedStateBackend<K> {
 	void setCurrentKey(K newKey);
 
 	/**
-	 * Used by states to access the current key.
-	 */
-	K getCurrentKey();
-
-	/**
-	 * Returns the key-group to which the current key belongs.
-	 */
-	int getCurrentKeyGroupIndex();
-
-	/**
-	 * Returns the number of key-groups aka max parallelism.
-	 */
-	int getNumberOfKeyGroups();
-
-	/**
-	 * Returns the key groups for this backend.
-	 */
-	KeyGroupsList getKeyGroupRange();
-
-	/**
-	 * {@link TypeSerializer} for the state backend key type.
-	 */
-	TypeSerializer<K> getKeySerializer();
-
-	/**
 	 * Creates or retrieves a keyed state backed by this state backend.
 	 *
 	 * @param namespaceSerializer The serializer used for the namespace type of the state

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
new file mode 100644
index 0000000..9e12ee5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public interface StateTransformationFunction<S, T> {
+	S apply(S previousState, T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 2e9198f..e27712c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -66,7 +66,10 @@ public class FsStateBackend extends AbstractStateBackend {
 
 	/** State below this size will be stored as part of the metadata, rather than in files */
 	private final int fileStateThreshold;
-	
+
+	/** Switch to chose between synchronous and asynchronous snapshots */
+	private final boolean asynchronousSnapshots;
+
 	/**
 	 * Creates a new state backend that stores its checkpoint data in the file system and location
 	 * defined by the given URI.
@@ -99,6 +102,27 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 *
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+		this(new Path(checkpointDataUri), asynchronousSnapshots);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public FsStateBackend(Path checkpointDataUri) throws IOException {
@@ -118,10 +142,52 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 *
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+		this(checkpointDataUri.toUri(), asynchronousSnapshots);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public FsStateBackend(URI checkpointDataUri) throws IOException {
-		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, false);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 *
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, asynchronousSnapshots);
 	}
 
 	/**
@@ -139,17 +205,47 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *                          and the path to the checkpoint data directory.
 	 * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
 	 *                             rather than in files
-	 * 
+	 *
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds.
 	 */
 	public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
+
+		this(checkpointDataUri, fileStateSizeThreshold, false);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
+	 *                             rather than in files
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 *
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public FsStateBackend(
+			URI checkpointDataUri,
+			int fileStateSizeThreshold,
+			boolean asynchronousSnapshots) throws IOException {
+
 		checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
-		checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, 
+		checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
 				"The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD);
 
 		this.fileStateThreshold = fileStateSizeThreshold;
 		this.basePath = validateAndNormalizeUri(checkpointDataUri);
+
+		this.asynchronousSnapshots = asynchronousSnapshots;
 	}
 
 	/**
@@ -166,9 +262,9 @@ public class FsStateBackend extends AbstractStateBackend {
 	 * Gets the threshold below which state is stored as part of the metadata, rather than in files.
 	 * This threshold ensures that the backend does not create a large amount of very small files,
 	 * where potentially the file pointers are larger than the state itself.
-	 * 
+	 *
 	 * <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}.
-	 * 
+	 *
 	 * @return The file size threshold, in bytes.
 	 */
 	public int getMinFileSizeThreshold() {
@@ -209,6 +305,7 @@ public class FsStateBackend extends AbstractStateBackend {
 				env.getUserClassLoader(),
 				numberOfKeyGroups,
 				keyGroupRange,
+				asynchronousSnapshots,
 				env.getExecutionConfig());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index 4ac7125..3e76423 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -22,18 +22,15 @@ import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
 
 import java.util.Collection;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
  * that is stored on the heap.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <SV> The type of the values in the state.
@@ -45,21 +42,25 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat
 		implements InternalMergingState<N, IN, OUT> {
 
 	/**
+	 * The merge transformation function that implements the merge logic.
+	 */
+	private final MergeTransformation mergeTransformation;
+
+	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
 	 */
 	protected AbstractHeapMergingState(
-			KeyedStateBackend<K> backend,
 			SD stateDesc,
 			StateTable<K, N, SV> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 
-		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.mergeTransformation = new MergeTransformation();
 	}
 
 	@Override
@@ -68,56 +69,40 @@ public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends Stat
 			return; // nothing to do
 		}
 
-		final K key = backend.getCurrentKey();
-		checkState(key != null, "No key set.");
-
-		final Map<N, Map<K, SV>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap != null) {
-			SV merged = null;
-
-			// merge the sources
-			for (N source : sources) {
-				Map<K, SV> keysForNamespace = namespaceMap.get(source);
-				if (keysForNamespace != null) {
-					// get and remove the next source per namespace/key
-					SV sourceState = keysForNamespace.remove(key);
-
-					// if the namespace map became empty, remove 
-					if (keysForNamespace.isEmpty()) {
-						namespaceMap.remove(source);
-					}
-
-					if (merged != null && sourceState != null) {
-						merged = mergeState(merged, sourceState);
-					}
-					else if (merged == null) {
-						merged = sourceState;
-					}
-				}
-			}
+		final StateTable<K, N, SV> map = stateTable;
+
+		SV merged = null;
+
+		// merge the sources
+		for (N source : sources) {
 
-			// merge into the target, if needed
-			if (merged != null) {
-				Map<K, SV> keysForTarget = namespaceMap.get(target);
-				if (keysForTarget == null) {
-					keysForTarget = createNewMap();
-					namespaceMap.put(target, keysForTarget);
-				}
-				SV targetState = keysForTarget.get(key);
-
-				if (targetState != null) {
-					targetState = mergeState(targetState, merged);
-				}
-				else {
-					targetState = merged;
-				}
-				keysForTarget.put(key, targetState);
+			// get and remove the next source per namespace/key
+			SV sourceState = map.removeAndGetOld(source);
+
+			if (merged != null && sourceState != null) {
+				merged = mergeState(merged, sourceState);
+			} else if (merged == null) {
+				merged = sourceState;
 			}
 		}
 
-		// else no entries for that key at all, nothing to do skip
+		// merge into the target, if needed
+		if (merged != null) {
+			map.transform(target, merged, mergeTransformation);
+		}
 	}
 
 	protected abstract SV mergeState(SV a, SV b) throws Exception;
-}
+
+	final class MergeTransformation implements StateTransformationFunction<SV, SV> {
+
+		@Override
+		public SV apply(SV targetState, SV merged) throws Exception {
+			if (targetState != null) {
+				return mergeState(targetState, merged);
+			} else {
+				return merged;
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 18b71de..7e1123d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -25,18 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.util.Preconditions;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Base class for partitioned {@link ListState} implementations that are backed by a regular
  * heap hash map. The concrete implementations define how the state is checkpointed.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <SV> The type of the values in the state.
@@ -53,9 +48,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 	protected final SD stateDesc;
 
 	/** The current namespace, which the access methods will refer to. */
-	protected N currentNamespace = null;
-
-	protected final KeyedStateBackend<K> backend;
+	protected N currentNamespace;
 
 	protected final TypeSerializer<K> keySerializer;
 
@@ -64,58 +57,28 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
 	 */
 	protected AbstractHeapState(
-			KeyedStateBackend<K> backend,
 			SD stateDesc,
 			StateTable<K, N, SV> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 
-		Preconditions.checkNotNull(stateTable, "State table must not be null.");
-
-		this.backend = backend;
 		this.stateDesc = stateDesc;
-		this.stateTable = stateTable;
+		this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null.");
 		this.keySerializer = keySerializer;
 		this.namespaceSerializer = namespaceSerializer;
+		this.currentNamespace = null;
 	}
 
 	// ------------------------------------------------------------------------
 
 	@Override
 	public final void clear() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, SV>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			return;
-		}
-
-		Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return;
-		}
-
-		SV removed = keyedMap.remove(backend.getCurrentKey());
-
-		if (removed == null) {
-			return;
-		}
-
-		if (!keyedMap.isEmpty()) {
-			return;
-		}
-
-		namespaceMap.remove(currentNamespace);
+		stateTable.remove(currentNamespace);
 	}
 
 	@Override
@@ -137,20 +100,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 		Preconditions.checkState(namespace != null, "No namespace given.");
 		Preconditions.checkState(key != null, "No key given.");
 
-		Map<N, Map<K, SV>> namespaceMap =
-				stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return null;
-		}
-
-		SV result = keyedMap.get(key);
+		SV result = stateTable.get(key, namespace);
 
 		if (result == null) {
 			return null;
@@ -158,30 +108,14 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 
 		@SuppressWarnings("unchecked,rawtypes")
 		TypeSerializer serializer = stateDesc.getSerializer();
-
 		return KvStateRequestSerializer.serializeValue(result, serializer);
 	}
 
 	/**
-	 * Creates a new map for use in Heap based state.
-	 *
-	 * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, this
-	 * will create a concurrent hash map instead of a regular one.
-	 *
-	 * @return A new namespace map.
-	 */
-	protected <MK, MV> Map<MK, MV> createNewMap() {
-		if (stateDesc.isQueryable()) {
-			return new ConcurrentHashMap<>();
-		} else {
-			return new HashMap<>();
-		}
-	}
-
-	/**
 	 * This should only be used for testing.
 	 */
+	@VisibleForTesting
 	public StateTable<K, N, SV> getStateTable() {
 		return stateTable;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
new file mode 100644
index 0000000..b0d7727
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how
+ * the snapshot is written during the serialization phase of checkpointing.
+ */
+@Internal
+abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateTableSnapshot {
+
+	/**
+	 * The {@link StateTable} from which this snapshot was created.
+	 */
+	final T owningStateTable;
+
+	/**
+	 * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table.
+	 *
+	 * @param owningStateTable the {@link StateTable} for which this object represents a snapshot.
+	 */
+	AbstractStateTableSnapshot(T owningStateTable) {
+		this.owningStateTable = Preconditions.checkNotNull(owningStateTable);
+	}
+
+	/**
+	 * Optional hook to release resources for this snapshot at the end of its lifecycle.
+	 */
+	@Override
+	public void release() {
+	}
+}
\ No newline at end of file


[2/4] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 0360161..f393237 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
 
@@ -47,35 +45,23 @@ public class HeapMapState<K, N, UK, UV>
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend    The state backend backing that created this state.
 	 * @param stateDesc  The state identifier for the state. This contains name
 	 *                   and can create a default state value.
 	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
 	 */
-	public HeapMapState(KeyedStateBackend<K> backend,
+	public HeapMapState(
 			MapStateDescriptor<UK, UV> stateDesc,
 			StateTable<K, N, HashMap<UK, UV>> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
-		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@Override
 	public UV get(UK userKey) {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
 
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			return null;
-		}
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 
-		HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
 		if (userMap == null) {
 			return null;
 		}
@@ -85,25 +71,11 @@ public class HeapMapState<K, N, UK, UV>
 
 	@Override
 	public void put(UK userKey, UV userValue) {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			namespaceMap = createNewMap();
-			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
-		}
-
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			keyedMap = createNewMap();
-			namespaceMap.put(currentNamespace, keyedMap);
-		}
 
-		HashMap<UK, UV> userMap = keyedMap.get(backend.getCurrentKey());
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 		if (userMap == null) {
 			userMap = new HashMap<>();
-			keyedMap.put(backend.getCurrentKey(), userMap);
+			stateTable.put(currentNamespace, userMap);
 		}
 
 		userMap.put(userKey, userValue);
@@ -111,52 +83,27 @@ public class HeapMapState<K, N, UK, UV>
 
 	@Override
 	public void putAll(Map<UK, UV> value) {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
 
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			namespaceMap = createNewMap();
-			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
-		}
-
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			keyedMap = createNewMap();
-			namespaceMap.put(currentNamespace, keyedMap);
-		}
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 
-		HashMap<UK, UV> userMap = keyedMap.get(backend.getCurrentKey());
 		if (userMap == null) {
 			userMap = new HashMap<>();
-			keyedMap.put(backend.getCurrentKey(), userMap);
+			stateTable.put(currentNamespace, userMap);
 		}
 
 		userMap.putAll(value);
 	}
-	
+
 	@Override
 	public void remove(UK userKey) {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			return;
-		}
 
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			return;
-		}
-
-		HashMap<UK, UV> userMap = keyedMap.get(backend.getCurrentKey());
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 		if (userMap == null) {
 			return;
 		}
 
 		userMap.remove(userKey);
-		
+
 		if (userMap.isEmpty()) {
 			clear();
 		}
@@ -164,101 +111,31 @@ public class HeapMapState<K, N, UK, UV>
 
 	@Override
 	public boolean contains(UK userKey) {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			return false;
-		}
-
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			return false;
-		}
-
-		HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-		
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap != null && userMap.containsKey(userKey);
 	}
 
 	@Override
 	public Iterable<Map.Entry<UK, UV>> entries() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			return null;
-		}
-
-		HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.entrySet();
 	}
 	
 	@Override
 	public Iterable<UK> keys() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			return null;
-		}
-
-		HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.keySet();
 	}
 
 	@Override
 	public Iterable<UV> values() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			return null;
-		}
-
-		HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.values();
 	}
 
 	@Override
 	public Iterator<Map.Entry<UK, UV>> iterator() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace);
-		if (keyedMap == null) {
-			return null;
-		}
-
-		HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey());
-
+		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.entrySet().iterator();
 	}
 
@@ -267,22 +144,12 @@ public class HeapMapState<K, N, UK, UV>
 		Preconditions.checkState(namespace != null, "No namespace given.");
 		Preconditions.checkState(key != null, "No key given.");
 
-		Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
-		if (namespaceMap == null) {
-			return null;
-		}
+		HashMap<UK, UV> result = stateTable.get(key, namespace);
 
-		Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(namespace);
-		if (keyedMap == null) {
+		if (null == result) {
 			return null;
 		}
 
-		HashMap<UK, UV> result = keyedMap.get(key);
-		if (result == null) {
-			return null;
-		}
-		
 		TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
 		TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 090a660..6e11327 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -22,17 +22,16 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.util.Map;
 
 /**
  * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is
  * snapshotted into files.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
@@ -41,25 +40,23 @@ public class HeapReducingState<K, N, V>
 		extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
 		implements InternalReducingState<N, V> {
 
-	private final ReduceFunction<V> reduceFunction;
+	private final ReduceTransformation<V> reduceTransformation;
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param stateTable The state table to use in this kev/value state. May contain initial state.
 	 */
 	public HeapReducingState(
-			KeyedStateBackend<K> backend,
 			ReducingStateDescriptor<V> stateDesc,
 			StateTable<K, N, V> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 
-		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
-		this.reduceFunction = stateDesc.getReduceFunction();
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction());
 	}
 
 	// ------------------------------------------------------------------------
@@ -68,62 +65,21 @@ public class HeapReducingState<K, N, V>
 
 	@Override
 	public V get() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, V>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return null;
-		}
-
-		return keyedMap.get(backend.<K>getCurrentKey());
+		return stateTable.get(currentNamespace);
 	}
 
 	@Override
 	public void add(V value) throws IOException {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
 
 		if (value == null) {
 			clear();
 			return;
 		}
 
-		Map<N, Map<K, V>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			namespaceMap = createNewMap();
-			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
-		}
-
-		Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			keyedMap = createNewMap();
-			namespaceMap.put(currentNamespace, keyedMap);
-		}
-
-		V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value);
-
-		if (currentValue == null) {
-			// we're good, just added the new value
-		} else {
-			V reducedValue;
-			try {
-				reducedValue = reduceFunction.reduce(currentValue, value);
-			} catch (Exception e) {
-				throw new IOException("Exception while applying ReduceFunction in reducing state", e);
-			}
-			keyedMap.put(backend.<K>getCurrentKey(), reducedValue);
+		try {
+			stateTable.transform(currentNamespace, value, reduceTransformation);
+		} catch (Exception e) {
+			throw new IOException("Exception while applying ReduceFunction in reducing state", e);
 		}
 	}
 
@@ -133,6 +89,20 @@ public class HeapReducingState<K, N, V>
 
 	@Override
 	protected V mergeState(V a, V b) throws Exception {
-		return reduceFunction.reduce(a, b);
+		return reduceTransformation.apply(a, b);
+	}
+
+	static final class ReduceTransformation<V> implements StateTransformationFunction<V, V> {
+
+		private final ReduceFunction<V> reduceFunction;
+
+		ReduceTransformation(ReduceFunction<V> reduceFunction) {
+			this.reduceFunction = Preconditions.checkNotNull(reduceFunction);
+		}
+
+		@Override
+		public V apply(V previousState, V value) throws Exception {
+			return previousState != null ? reduceFunction.reduce(previousState, value) : value;
+		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index 9e042fe..6de62a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -21,16 +21,12 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Map;
 
 /**
  * Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted
  * into files.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
@@ -42,39 +38,21 @@ public class HeapValueState<K, N, V>
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
 	 */
 	public HeapValueState(
-			KeyedStateBackend<K> backend,
 			ValueStateDescriptor<V> stateDesc,
 			StateTable<K, N, V> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
-		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@Override
 	public V value() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, V>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			return stateDesc.getDefaultValue();
-		}
-
-		Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return stateDesc.getDefaultValue();
-		}
-
-		V result = keyedMap.get(backend.<K>getCurrentKey());
+		final V result = stateTable.get(currentNamespace);
 
 		if (result == null) {
 			return stateDesc.getDefaultValue();
@@ -85,29 +63,12 @@ public class HeapValueState<K, N, V>
 
 	@Override
 	public void update(V value) {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
 
 		if (value == null) {
 			clear();
 			return;
 		}
 
-		Map<N, Map<K, V>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			namespaceMap = createNewMap();
-			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
-		}
-
-		Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			keyedMap = createNewMap();
-			namespaceMap.put(currentNamespace, keyedMap);
-		}
-
-		keyedMap.put(backend.<K>getCurrentKey(), value);
+		stateTable.put(currentNamespace, value);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
new file mode 100644
index 0000000..cb0582b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+/**
+ * This interface is the current context of a keyed state. It provides information about the currently selected key in
+ * the context, the corresponding key-group, and other key and key-grouping related information.
+ * <p>
+ * The typical use case for this interface is providing a view on the current-key selection aspects of
+ * {@link org.apache.flink.runtime.state.KeyedStateBackend}.
+ */
+@Internal
+public interface InternalKeyContext<K> {
+
+	/**
+	 * Used by states to access the current key.
+	 */
+	K getCurrentKey();
+
+	/**
+	 * Returns the key-group to which the current key belongs.
+	 */
+	int getCurrentKeyGroupIndex();
+
+	/**
+	 * Returns the number of key-groups aka max parallelism.
+	 */
+	int getNumberOfKeyGroups();
+
+	/**
+	 * Returns the key groups for this backend.
+	 */
+	KeyGroupRange getKeyGroupRange();
+
+	/**
+	 * {@link TypeSerializer} for the state backend key type.
+	 */
+	TypeSerializer<K> getKeySerializer();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
new file mode 100644
index 0000000..22f344d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This implementation of {@link StateTable} uses nested {@link HashMap} objects. It is also maintaining a partitioning
+ * by key-group.
+ * <p>
+ * In contrast to {@link CopyOnWriteStateTable}, this implementation does not support asynchronous snapshots. However,
+ * it might have a better memory footprint for some use-cases, e.g. it is naturally de-duplicating namespace objects.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+@Internal
+public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
+
+	/**
+	 * Map for holding the actual state objects. The outer array represents the key-groups. The nested maps provide
+	 * an outer scope by namespace and an inner scope by key.
+	 */
+	private final Map<N, Map<K, S>>[] state;
+
+	/**
+	 * The offset to the contiguous key groups
+	 */
+	private final int keyGroupOffset;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new {@link NestedMapsStateTable} for the given key context and meta info.
+	 *
+	 * @param keyContext the key context.
+	 * @param metaInfo the meta information for this state table.
+	 */
+	public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+		super(keyContext, metaInfo);
+		this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
+
+		@SuppressWarnings("unchecked")
+		Map<N, Map<K, S>>[] state = (Map<N, Map<K, S>>[]) new Map[keyContext.getNumberOfKeyGroups()];
+		this.state = state;
+	}
+
+	// ------------------------------------------------------------------------
+	//  access to maps
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the internal data structure.
+	 */
+	@VisibleForTesting
+	public Map<N, Map<K, S>>[] getState() {
+		return state;
+	}
+
+	@VisibleForTesting
+	Map<N, Map<K, S>> getMapForKeyGroup(int keyGroupIndex) {
+		final int pos = indexToOffset(keyGroupIndex);
+		if (pos >= 0 && pos < state.length) {
+			return state[pos];
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Sets the given map for the given key-group.
+	 */
+	private void setMapForKeyGroup(int keyGroupId, Map<N, Map<K, S>> map) {
+		try {
+			state[indexToOffset(keyGroupId)] = map;
+		} catch (ArrayIndexOutOfBoundsException e) {
+			throw new IllegalArgumentException("Key group index out of range of key group range [" +
+					keyGroupOffset + ", " + (keyGroupOffset + state.length) + ").");
+		}
+	}
+
+	/**
+	 * Translates a key-group id to the internal array offset.
+	 */
+	private int indexToOffset(int index) {
+		return index - keyGroupOffset;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int size() {
+		int count = 0;
+		for (Map<N, Map<K, S>> namespaceMap : state) {
+			if (null != namespaceMap) {
+				for (Map<K, S> keyMap : namespaceMap.values()) {
+					if (null != keyMap) {
+						count += keyMap.size();
+					}
+				}
+			}
+		}
+		return count;
+	}
+
+	@Override
+	public S get(N namespace) {
+		return get(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
+	}
+
+	@Override
+	public boolean containsKey(N namespace) {
+		return containsKey(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
+	}
+
+	@Override
+	public void put(N namespace, S state) {
+		put(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);
+	}
+
+	@Override
+	public S putAndGetOld(N namespace, S state) {
+		return putAndGetOld(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);
+	}
+
+	@Override
+	public void remove(N namespace) {
+		remove(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
+	}
+
+	@Override
+	public S removeAndGetOld(N namespace) {
+		return removeAndGetOld(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
+	}
+
+	@Override
+	public S get(K key, N namespace) {
+		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, keyContext.getNumberOfKeyGroups());
+		return get(key, keyGroup, namespace);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private boolean containsKey(K key, int keyGroupIndex, N namespace) {
+
+		checkKeyNamespacePreconditions(key, namespace);
+
+		Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+		if (namespaceMap == null) {
+			return false;
+		}
+
+		Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+		return keyedMap != null && keyedMap.containsKey(key);
+	}
+
+	S get(K key, int keyGroupIndex, N namespace) {
+
+		checkKeyNamespacePreconditions(key, namespace);
+
+		Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+		if (namespaceMap == null) {
+			return null;
+		}
+
+		Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+		if (keyedMap == null) {
+			return null;
+		}
+
+		return keyedMap.get(key);
+	}
+
+	@Override
+	public void put(K key, int keyGroupIndex, N namespace, S value) {
+		putAndGetOld(key, keyGroupIndex, namespace, value);
+	}
+
+	private S putAndGetOld(K key, int keyGroupIndex, N namespace, S value) {
+
+		checkKeyNamespacePreconditions(key, namespace);
+
+		Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+		if (namespaceMap == null) {
+			namespaceMap = new HashMap<>();
+			setMapForKeyGroup(keyGroupIndex, namespaceMap);
+		}
+
+		Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+		if (keyedMap == null) {
+			keyedMap = new HashMap<>();
+			namespaceMap.put(namespace, keyedMap);
+		}
+
+		return keyedMap.put(key, value);
+	}
+
+	private void remove(K key, int keyGroupIndex, N namespace) {
+		removeAndGetOld(key, keyGroupIndex, namespace);
+	}
+
+	private S removeAndGetOld(K key, int keyGroupIndex, N namespace) {
+
+		checkKeyNamespacePreconditions(key, namespace);
+
+		Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+		if (namespaceMap == null) {
+			return null;
+		}
+
+		Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+		if (keyedMap == null) {
+			return null;
+		}
+
+		S removed = keyedMap.remove(key);
+
+		if (keyedMap.isEmpty()) {
+			namespaceMap.remove(namespace);
+		}
+
+		return removed;
+	}
+
+	private void checkKeyNamespacePreconditions(K key, N namespace) {
+		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
+		Preconditions.checkNotNull(namespace, "Provided namespace is null.");
+	}
+
+	@Override
+	public int sizeOfNamespace(Object namespace) {
+		int count = 0;
+		for (Map<N, Map<K, S>> namespaceMap : state) {
+			if (null != namespaceMap) {
+				Map<K, S> keyMap = namespaceMap.get(namespace);
+				count += keyMap != null ? keyMap.size() : 0;
+			}
+		}
+
+		return count;
+	}
+
+	@Override
+	public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
+		final K key = keyContext.getCurrentKey();
+		checkKeyNamespacePreconditions(key, namespace);
+		final int keyGroupIndex = keyContext.getCurrentKeyGroupIndex();
+
+		Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
+
+		if (namespaceMap == null) {
+			namespaceMap = new HashMap<>();
+			setMapForKeyGroup(keyGroupIndex, namespaceMap);
+		}
+
+		Map<K, S> keyedMap = namespaceMap.get(namespace);
+
+		if (keyedMap == null) {
+			keyedMap = new HashMap<>();
+			namespaceMap.put(namespace, keyedMap);
+		}
+
+		keyedMap.put(key, transformation.apply(keyedMap.get(key), value));
+	}
+
+	// snapshots ---------------------------------------------------------------------------------------------------
+
+	private static <K, N, S> int countMappingsInKeyGroup(final Map<N, Map<K, S>> keyGroupMap) {
+		int count = 0;
+		for (Map<K, S> namespaceMap : keyGroupMap.values()) {
+			count += namespaceMap.size();
+		}
+
+		return count;
+	}
+
+	@Override
+	public NestedMapsStateTableSnapshot<K, N, S> createSnapshot() {
+		return new NestedMapsStateTableSnapshot<>(this);
+	}
+
+	/**
+	 * This class encapsulates the snapshot logic.
+	 *
+	 * @param <K> type of key.
+	 * @param <N> type of namespace.
+	 * @param <S> type of state.
+	 */
+	static class NestedMapsStateTableSnapshot<K, N, S>
+			extends AbstractStateTableSnapshot<K, N, S, NestedMapsStateTable<K, N, S>> {
+
+		NestedMapsStateTableSnapshot(NestedMapsStateTable<K, N, S> owningTable) {
+			super(owningTable);
+		}
+
+		/**
+		 * Implementation note: we currently chose the same format between {@link NestedMapsStateTable} and
+		 * {@link CopyOnWriteStateTable}.
+		 * <p>
+		 * {@link NestedMapsStateTable} could naturally support a kind of
+		 * prefix-compressed format (grouping by namespace, writing the namespace only once per group instead for each
+		 * mapping). We might implement support for different formats later (tailored towards different state table
+		 * implementations).
+		 */
+		@Override
+		public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
+			final Map<N, Map<K, S>> keyGroupMap = owningStateTable.getMapForKeyGroup(keyGroupId);
+			if (null != keyGroupMap) {
+				TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();
+				TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer();
+				TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer();
+				dov.writeInt(countMappingsInKeyGroup(keyGroupMap));
+				for (Map.Entry<N, Map<K, S>> namespaceEntry : keyGroupMap.entrySet()) {
+					final N namespace = namespaceEntry.getKey();
+					final Map<K, S> namespaceMap = namespaceEntry.getValue();
+
+					for (Map.Entry<K, S> keyEntry : namespaceMap.entrySet()) {
+						namespaceSerializer.serialize(namespace, dov);
+						keySerializer.serialize(keyEntry.getKey(), dov);
+						stateSerializer.serialize(keyEntry.getValue(), dov);
+					}
+				}
+			} else {
+				dov.writeInt(0);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java
new file mode 100644
index 0000000..8e29cb2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+/**
+ * Interface of entries in a state table. Entries are triple of key, namespace, and state.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+public interface StateEntry<K, N, S> {
+
+	/**
+	 * Returns the key of this entry.
+	 */
+	K getKey();
+
+	/**
+	 * Returns the namespace of this entry.
+	 */
+	N getNamespace();
+
+	/**
+	 * Returns the state of this entry.
+	 */
+	S getState();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 21265f4..62fc869 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,72 +15,152 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
-import org.apache.flink.runtime.state.KeyGroupRange;
-
-import java.util.Map;
-
-public class StateTable<K, N, ST> {
-
-	/** Map for holding the actual state objects. */
-	private final Map<N, Map<K, ST>>[] state;
-
-	/** The offset to the contiguous key groups */
-	private final int keyGroupOffset;
-
-	/** Combined meta information such as name and serializers for this state */
-	private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
-
-	// ------------------------------------------------------------------------
-	public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, KeyGroupRange keyGroupRange) {
-		this.metaInfo = metaInfo;
-		this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
-
-		@SuppressWarnings("unchecked")
-		Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()];
-		this.state = state;
-	}
-
-	// ------------------------------------------------------------------------
-	//  access to maps
-	// ------------------------------------------------------------------------
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.Preconditions;
 
-	public Map<N, Map<K, ST>>[] getState() {
-		return state;
-	}
-
-	public Map<N, Map<K, ST>> get(int index) {
-		final int pos = indexToOffset(index);
-		if (pos >= 0 && pos < state.length) {
-			return state[pos];
-		} else {
-			return null;
-		}
-	}
-
-	public void set(int index, Map<N, Map<K, ST>> map) {
-		try {
-			state[indexToOffset(index)] = map;
-		}
-		catch (ArrayIndexOutOfBoundsException e) {
-			throw new IllegalArgumentException("Key group index out of range of key group range [" +
-					keyGroupOffset + ", " + (keyGroupOffset + state.length) + ").");
-		}
+/**
+ * Base class for state tables. Accesses to state are typically scoped by the currently active key, as provided
+ * through the {@link InternalKeyContext}.
+ *
+ * @param <K> type of key
+ * @param <N> type of namespace
+ * @param <S> type of state
+ */
+public abstract class StateTable<K, N, S> {
+
+	/**
+	 * The key context view on the backend. This provides information, such as the currently active key.
+	 */
+	protected final InternalKeyContext<K> keyContext;
+
+	/**
+	 * Combined meta information such as name and serializers for this state
+	 */
+	protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
+
+	/**
+	 *
+	 * @param keyContext the key context provides the key scope for all put/get/delete operations.
+	 * @param metaInfo the meta information, including the type serializer for state copy-on-write.
+	 */
+	public StateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+		this.keyContext = Preconditions.checkNotNull(keyContext);
+		this.metaInfo = Preconditions.checkNotNull(metaInfo);
 	}
 
-	private int indexToOffset(int index) {
-		return index - keyGroupOffset;
+	// Main interface methods of StateTable -------------------------------------------------------
+
+	/**
+	 * Returns whether this {@link NestedMapsStateTable} is empty.
+	 *
+	 * @return {@code true} if this {@link NestedMapsStateTable} has no elements, {@code false}
+	 * otherwise.
+	 * @see #size()
+	 */
+	public boolean isEmpty() {
+		return size() == 0;
 	}
 
-	// ------------------------------------------------------------------------
-	//  metadata
-	// ------------------------------------------------------------------------
-	
-	public TypeSerializer<ST> getStateSerializer() {
+	/**
+	 * Returns the total number of entries in this {@link NestedMapsStateTable}. This is the sum of both sub-tables.
+	 *
+	 * @return the number of entries in this {@link NestedMapsStateTable}.
+	 */
+	public abstract int size();
+
+	/**
+	 * Returns the state of the mapping for the composite of active key and given namespace.
+	 *
+	 * @param namespace the namespace. Not null.
+	 * @return the states of the mapping with the specified key/namespace composite key, or {@code null}
+	 * if no mapping for the specified key is found.
+	 */
+	public abstract S get(N namespace);
+
+	/**
+	 * Returns whether this table contains a mapping for the composite of active key and given namespace.
+	 *
+	 * @param namespace the namespace in the composite key to search for. Not null.
+	 * @return {@code true} if this map contains the specified key/namespace composite key,
+	 * {@code false} otherwise.
+	 */
+	public abstract boolean containsKey(N namespace);
+
+	/**
+	 * Maps the composite of active key and given namespace to the specified state. This method should be preferred
+	 * over {@link #putAndGetOld(N, S)} (Namespace, State)} when the caller is not interested in the old state.
+	 *
+	 * @param namespace the namespace. Not null.
+	 * @param state     the state. Can be null.
+	 */
+	public abstract void put(N namespace, S state);
+
+	/**
+	 * Maps the composite of active key and given namespace to the specified state. Returns the previous state that
+	 * was registered under the composite key.
+	 *
+	 * @param namespace the namespace. Not null.
+	 * @param state     the state. Can be null.
+	 * @return the state of any previous mapping with the specified key or
+	 * {@code null} if there was no such mapping.
+	 */
+	public abstract S putAndGetOld(N namespace, S state);
+
+	/**
+	 * Removes the mapping for the composite of active key and given namespace. This method should be preferred
+	 * over {@link #removeAndGetOld(N)} when the caller is not interested in the old state.
+	 *
+	 * @param namespace the namespace of the mapping to remove. Not null.
+	 */
+	public abstract void remove(N namespace);
+
+	/**
+	 * Removes the mapping for the composite of active key and given namespace, returning the state that was
+	 * found under the entry.
+	 *
+	 * @param namespace the namespace of the mapping to remove. Not null.
+	 * @return the state of the removed mapping or {@code null} if no mapping
+	 * for the specified key was found.
+	 */
+	public abstract S removeAndGetOld(N namespace);
+
+	/**
+	 * Applies the given {@link StateTransformationFunction} to the state (1st input argument), using the given value as
+	 * second input argument. The result of {@link StateTransformationFunction#apply(Object, Object)} is then stored as
+	 * the new state. This function is basically an optimization for get-update-put pattern.
+	 *
+	 * @param namespace      the namespace. Not null.
+	 * @param value          the value to use in transforming the state. Can be null.
+	 * @param transformation the transformation function.
+	 * @throws Exception if some exception happens in the transformation function.
+	 */
+	public abstract <T> void transform(
+			N namespace,
+			T value,
+			StateTransformationFunction<S, T> transformation) throws Exception;
+
+	// For queryable state ------------------------------------------------------------------------
+
+	/**
+	 * Returns the state for the composite of active key and given namespace. This is typically used by
+	 * queryable state.
+	 *
+	 * @param key       the key. Not null.
+	 * @param namespace the namespace. Not null.
+	 * @return the state of the mapping with the specified key/namespace composite key, or {@code null}
+	 * if no mapping for the specified key is found.
+	 */
+	public abstract S get(K key, N namespace);
+
+	// Meta data setter / getter and toString -----------------------------------------------------
+
+	public TypeSerializer<S> getStateSerializer() {
 		return metaInfo.getStateSerializer();
 	}
 
@@ -88,28 +168,22 @@ public class StateTable<K, N, ST> {
 		return metaInfo.getNamespaceSerializer();
 	}
 
-	public RegisteredBackendStateMetaInfo<N, ST> getMetaInfo() {
+	public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
 		return metaInfo;
 	}
 
-	public void setMetaInfo(RegisteredBackendStateMetaInfo<N, ST> metaInfo) {
+	public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
 		this.metaInfo = metaInfo;
 	}
 
-	// ------------------------------------------------------------------------
-	//  for testing
-	// ------------------------------------------------------------------------
+	// Snapshot / Restore -------------------------------------------------------------------------
+
+	abstract StateTableSnapshot createSnapshot();
+
+	public abstract void put(K key, int keyGroup, N namespace, S state);
+
+	// For testing --------------------------------------------------------------------------------
 
 	@VisibleForTesting
-	boolean isEmpty() {
-		for (Map<N, Map<K, ST>> map : state) {
-			if (map != null) {
-				if (!map.isEmpty()) {
-					return false;
-				}
-			}
-		}
-
-		return true;
-	}
-}
+	public abstract int sizeOfNamespace(Object namespace);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
new file mode 100644
index 0000000..659c174
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * Interface for state de-serialization into {@link StateTable}s by key-group.
+ */
+interface StateTableByKeyGroupReader {
+
+	/**
+	 * Read the data for the specified key-group from the input.
+	 *
+	 * @param div        the input
+	 * @param keyGroupId the key-group to write
+	 * @throws IOException on write related problems
+	 */
+	void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
new file mode 100644
index 0000000..53ec349
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * This class provides a static factory method to create different implementations of {@link StateTableByKeyGroupReader}
+ * depending on the provided serialization format version.
+ * <p>
+ * The implementations are also located here as inner classes.
+ */
+class StateTableByKeyGroupReaders {
+
+	/**
+	 * Creates a new StateTableByKeyGroupReader that inserts de-serialized mappings into the given table, using the
+	 * de-serialization algorithm that matches the given version.
+	 *
+	 * @param table the {@link StateTable} into which de-serialized mappings are inserted.
+	 * @param version version for the de-serialization algorithm.
+	 * @param <K> type of key.
+	 * @param <N> type of namespace.
+	 * @param <S> type of state.
+	 * @return the appropriate reader.
+	 */
+	static <K, N, S> StateTableByKeyGroupReader readerForVersion(StateTable<K, N, S> table, int version) {
+		switch (version) {
+			case 1:
+				return new StateTableByKeyGroupReaderV1<>(table);
+			case 2:
+				return new StateTableByKeyGroupReaderV2<>(table);
+			default:
+				throw new IllegalArgumentException("Unknown version: " + version);
+		}
+	}
+
+	static abstract class AbstractStateTableByKeyGroupReader<K, N, S>
+			implements StateTableByKeyGroupReader {
+
+		protected final StateTable<K, N, S> stateTable;
+
+		AbstractStateTableByKeyGroupReader(StateTable<K, N, S> stateTable) {
+			this.stateTable = stateTable;
+		}
+
+		@Override
+		public abstract void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
+
+		protected TypeSerializer<K> getKeySerializer() {
+			return stateTable.keyContext.getKeySerializer();
+		}
+
+		protected TypeSerializer<N> getNamespaceSerializer() {
+			return stateTable.getNamespaceSerializer();
+		}
+
+		protected TypeSerializer<S> getStateSerializer() {
+			return stateTable.getStateSerializer();
+		}
+	}
+
+	static final class StateTableByKeyGroupReaderV1<K, N, S>
+			extends AbstractStateTableByKeyGroupReader<K, N, S> {
+
+		StateTableByKeyGroupReaderV1(StateTable<K, N, S> stateTable) {
+			super(stateTable);
+		}
+
+		@Override
+		public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
+
+			if (inView.readByte() == 0) {
+				return;
+			}
+
+			final TypeSerializer<K> keySerializer = getKeySerializer();
+			final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
+			final TypeSerializer<S> stateSerializer = getStateSerializer();
+
+			// V1 uses kind of namespace compressing format
+			int numNamespaces = inView.readInt();
+			for (int k = 0; k < numNamespaces; k++) {
+				N namespace = namespaceSerializer.deserialize(inView);
+				int numEntries = inView.readInt();
+				for (int l = 0; l < numEntries; l++) {
+					K key = keySerializer.deserialize(inView);
+					S state = stateSerializer.deserialize(inView);
+					stateTable.put(key, keyGroupId, namespace, state);
+				}
+			}
+		}
+	}
+
+	private static final class StateTableByKeyGroupReaderV2<K, N, S>
+			extends AbstractStateTableByKeyGroupReader<K, N, S> {
+
+		StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) {
+			super(stateTable);
+		}
+
+		@Override
+		public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
+
+			final TypeSerializer<K> keySerializer = getKeySerializer();
+			final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
+			final TypeSerializer<S> stateSerializer = getStateSerializer();
+
+			int numKeys = inView.readInt();
+			for (int i = 0; i < numKeys; ++i) {
+				N namespace = namespaceSerializer.deserialize(inView);
+				K key = keySerializer.deserialize(inView);
+				S state = stateSerializer.deserialize(inView);
+				stateTable.put(key, keyGroupId, namespace, state);
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java
new file mode 100644
index 0000000..d4244d7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Interface for the snapshots of a {@link StateTable}. Offers a way to serialize the snapshot (by key-group). All
+ * snapshots should be released after usage.
+ */
+interface StateTableSnapshot {
+
+	/**
+	 * Writes the data for the specified key-group to the output.
+	 *
+	 * @param dov the output
+	 * @param keyGroupId the key-group to write
+	 * @throws IOException on write related problems
+	 */
+	void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException;
+
+	/**
+	 * Release the snapshot. All snapshots should be released when they are no longer used because some implementation
+	 * can only release resources after a release.
+	 */
+	void release();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index da01c09..f0bac1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -45,6 +45,9 @@ public class MemoryStateBackend extends AbstractStateBackend {
 	/** The maximal size that the snapshotted memory state may have */
 	private final int maxStateSize;
 
+	/** Switch to chose between synchronous and asynchronous snapshots */
+	private final boolean asynchronousSnapshots;
+
 	/**
 	 * Creates a new memory state backend that accepts states whose serialized forms are
 	 * up to the default state size (5 MB).
@@ -60,7 +63,29 @@ public class MemoryStateBackend extends AbstractStateBackend {
 	 * @param maxStateSize The maximal size of the serialized state
 	 */
 	public MemoryStateBackend(int maxStateSize) {
+		this(maxStateSize, false);
+	}
+
+	/**
+	 * Creates a new memory state backend that accepts states whose serialized forms are
+	 * up to the default state size (5 MB).
+	 *
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 */
+	public MemoryStateBackend(boolean asynchronousSnapshots) {
+		this(DEFAULT_MAX_STATE_SIZE, asynchronousSnapshots);
+	}
+
+	/**
+	 * Creates a new memory state backend that accepts states whose serialized forms are
+	 * up to the given number of bytes.
+	 *
+	 * @param maxStateSize The maximal size of the serialized state
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 */
+	public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) {
 		this.maxStateSize = maxStateSize;
+		this.asynchronousSnapshots = asynchronousSnapshots;
 	}
 
 	@Override
@@ -98,6 +123,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 				env.getUserClassLoader(),
 				numberOfKeyGroups,
 				keyGroupRange,
+				asynchronousSnapshots,
 				env.getExecutionConfig());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 2c385c1..b4d6eb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -33,12 +33,12 @@ import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapValueState;
-import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.MathUtils;
 import org.junit.AfterClass;
@@ -278,9 +278,8 @@ public class QueryableStateClientTest {
 
 				// Register state
 				HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
-						keyedStateBackend,
 						descriptor,
-						new StateTable<Integer, VoidNamespace, Integer>(registeredBackendStateMetaInfo, new KeyGroupRange(0, 1)),
+						new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, registeredBackendStateMetaInfo),
 						IntSerializer.INSTANCE,
 						VoidNamespaceSerializer.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 93094a4..4ed63a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -21,12 +21,10 @@ package org.apache.flink.runtime.query.netty.message;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,13 +36,16 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
-
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,10 +55,19 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
+@RunWith(Parameterized.class)
 public class KvStateRequestSerializerTest {
 
 	private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
 
+	@Parameterized.Parameters
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(false, true);
+	}
+
+	@Parameterized.Parameter
+	public boolean async;
+
 	/**
 	 * Tests KvState request serialization.
 	 */
@@ -332,7 +342,9 @@ public class KvStateRequestSerializerTest {
 				mock(TaskKvStateRegistry.class),
 				LongSerializer.INSTANCE,
 				ClassLoader.getSystemClassLoader(),
-				1, new KeyGroupRange(0, 0),
+				1,
+				new KeyGroupRange(0, 0),
+				async,
 				new ExecutionConfig()
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
@@ -418,7 +430,7 @@ public class KvStateRequestSerializerTest {
 		KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3},
 			LongSerializer.INSTANCE);
 	}
-	
+
 	/**
 	 * Tests map serialization utils.
 	 */
@@ -429,11 +441,13 @@ public class KvStateRequestSerializerTest {
 		// objects for heap state list serialisation
 		final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
 			new HeapKeyedStateBackend<>(
-				mock(TaskKvStateRegistry.class),
-				LongSerializer.INSTANCE,
-				ClassLoader.getSystemClassLoader(),
-				1, new KeyGroupRange(0, 0),
-				new ExecutionConfig()
+					mock(TaskKvStateRegistry.class),
+					LongSerializer.INSTANCE,
+					ClassLoader.getSystemClassLoader(),
+					1,
+					new KeyGroupRange(0, 0),
+					async,
+					new ExecutionConfig()
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -481,7 +495,7 @@ public class KvStateRequestSerializerTest {
 			KvStateRequestSerializer.serializeKeyAndNamespace(
 				key, LongSerializer.INSTANCE,
 				VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
-		
+
 		final byte[] serializedValues = mapState.getSerializedValue(serializedKey);
 
 		Map<Long, String> actualValues = KvStateRequestSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
@@ -534,7 +548,7 @@ public class KvStateRequestSerializerTest {
 		KvStateRequestSerializer.deserializeMap(new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 0},
 				LongSerializer.INSTANCE, LongSerializer.INSTANCE);
 	}
-	
+
 	/**
 	 * Tests map deserialization with too few bytes.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
new file mode 100644
index 0000000..dd73e42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public class AsyncFileStateBackendTest extends FileStateBackendTest {
+
+	@Override
+	protected boolean useAsyncMode() {
+		return true;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
new file mode 100644
index 0000000..ba4a89d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public class AsyncMemoryStateBackendTest extends MemoryStateBackendTest {
+
+	@Override
+	protected boolean useAsyncMode() {
+		return true;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 75014e7..6be2343 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -50,7 +51,11 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 	@Override
 	protected FsStateBackend getStateBackend() throws Exception {
 		File checkpointPath = tempFolder.newFolder();
-		return new FsStateBackend(localFileUri(checkpointPath));
+		return new FsStateBackend(localFileUri(checkpointPath), useAsyncMode());
+	}
+
+	protected boolean useAsyncMode() {
+		return false;
 	}
 
 	// disable these because the verification does not work for this state backend
@@ -208,6 +213,7 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 		}
 	}
 
+	@Ignore
 	@Test
 	public void testConcurrentMapIfQueryable() throws Exception {
 		super.testConcurrentMapIfQueryable();

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 362fcd6..48d56e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -44,7 +45,11 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 
 	@Override
 	protected MemoryStateBackend getStateBackend() throws Exception {
-		return new MemoryStateBackend();
+		return new MemoryStateBackend(useAsyncMode());
+	}
+
+	protected boolean useAsyncMode() {
+		return false;
 	}
 
 	// disable these because the verification does not work for this state backend
@@ -193,6 +198,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 		}
 	}
 
+	@Ignore
 	@Test
 	public void testConcurrentMapIfQueryable() throws Exception {
 		super.testConcurrentMapIfQueryable();

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 40ac72c..331c6bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -42,6 +42,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -50,10 +51,15 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.heap.AbstractHeapState;
+import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
 import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.types.IntValue;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -67,6 +73,7 @@ import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -791,7 +798,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	@SuppressWarnings("unchecked,rawtypes")
 	public void testMapState() {
@@ -823,7 +830,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			assertTrue(state.contains(1));
 			assertEquals("1", state.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }}, 
+			assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
 					getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 			// draw a snapshot
@@ -848,12 +855,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("102", state.get(102));
-			assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }}, 
+			assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
 					getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 			backend.setCurrentKey(3);
 			assertTrue(state.contains(103));
 			assertEquals("103", state.get(103));
-			assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }}, 
+			assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
 					getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 			List<Integer> keys = new ArrayList<>();
@@ -912,11 +919,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			backend.setCurrentKey(1);
 			assertEquals("1", restored1.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }}, 
+			assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
 					getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("2", restored1.get(2));
-			assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }}, 
+			assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
 					getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 			backend.dispose();
@@ -931,15 +938,15 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			backend.setCurrentKey(1);
 			assertEquals("101", restored2.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }}, 
+			assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
 					getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("102", restored2.get(102));
-			assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }}, 
+			assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
 					getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 			backend.setCurrentKey(3);
 			assertEquals("103", restored2.get(103));
-			assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }}, 
+			assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
 					getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 			backend.dispose();
@@ -1111,7 +1118,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		backend.dispose();
 	}
-	
+
 	/**
 	 * This test verifies that state is correctly assigned to key groups and that restore
 	 * restores the relevant key groups in the backend.
@@ -1364,7 +1371,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testMapStateRestoreWithWrongSerializers() {
@@ -1507,11 +1514,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			state.update(121818273);
 
-			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
-			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
-			assertNotNull("State not set", stateTable.get(keyGroupIndex));
-			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
-			assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 
 		}
 
@@ -1533,11 +1537,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
-			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
-			assertNotNull("State not set", stateTable.get(keyGroupIndex));
-			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
-			assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 		}
 
 		{
@@ -1564,11 +1565,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
-			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
-			assertNotNull("State not set", stateTable.get(keyGroupIndex));
-			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
-			assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 		}
 
 		{
@@ -1595,13 +1593,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
-			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
-			assertNotNull("State not set", stateTable.get(keyGroupIndex));
-			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
-			assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+			StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 		}
-		
+
 		{
 			// MapState
 			MapStateDescriptor<Integer, String> desc = new MapStateDescriptor<>("map-state", Integer.class, String.class);
@@ -1623,13 +1618,22 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
 			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
 			assertNotNull("State not set", stateTable.get(keyGroupIndex));
-			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
-			assertTrue(stateTable.get(keyGroupIndex).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+			checkConcurrentStateTable(stateTable, numberOfKeyGroups);
 		}
 
 		backend.dispose();
 	}
 
+	private void checkConcurrentStateTable(StateTable<?, ?, ?> stateTable, int numberOfKeyGroups) {
+		assertNotNull("State not set", stateTable);
+		if (stateTable instanceof NestedMapsStateTable) {
+			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
+			NestedMapsStateTable<?, ?, ?> nestedMapsStateTable = (NestedMapsStateTable<?, ?, ?>) stateTable;
+			assertTrue(nestedMapsStateTable.getState()[keyGroupIndex] instanceof ConcurrentHashMap);
+			assertTrue(nestedMapsStateTable.getState()[keyGroupIndex].get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
+		}
+	}
+
 	/**
 	 * Tests registration with the KvStateRegistry.
 	 */
@@ -1688,7 +1692,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			KeyGroupsStateHandle snapshot =
+					runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
 			assertNull(snapshot);
 			backend.dispose();
 
@@ -1708,6 +1713,145 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 	}
 
+	@Test
+	public void testAsyncSnapshot() throws Exception {
+		OneShotLatch waiter = new OneShotLatch();
+		BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024);
+		streamFactory.setWaiterLatch(waiter);
+
+		AbstractKeyedStateBackend<Integer> backend = null;
+		KeyGroupsStateHandle stateHandle = null;
+
+		try {
+			backend = createKeyedBackend(IntSerializer.INSTANCE);
+			InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+					VoidNamespaceSerializer.INSTANCE,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+			valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			for (int i = 0; i < 10; ++i) {
+				backend.setCurrentKey(i);
+				valueState.update(i);
+			}
+
+			RunnableFuture<KeyGroupsStateHandle> snapshot =
+					backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+			Thread runner = new Thread(snapshot);
+			runner.start();
+			for (int i = 0; i < 20; ++i) {
+				backend.setCurrentKey(i);
+				valueState.update(i + 1);
+				if (10 == i) {
+					waiter.await();
+				}
+			}
+
+			runner.join();
+			stateHandle = snapshot.get();
+
+			// test isolation
+			for (int i = 0; i < 20; ++i) {
+				backend.setCurrentKey(i);
+				Assert.assertEquals(i + 1, (int) valueState.value());
+			}
+
+		} finally {
+			if (null != backend) {
+				IOUtils.closeQuietly(backend);
+				backend.dispose();
+			}
+		}
+
+		Assert.assertNotNull(stateHandle);
+
+		backend = createKeyedBackend(IntSerializer.INSTANCE);
+		try {
+			backend.restore(Collections.singleton(stateHandle));
+			InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+					VoidNamespaceSerializer.INSTANCE,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+			valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			for (int i = 0; i < 10; ++i) {
+				backend.setCurrentKey(i);
+				Assert.assertEquals(i, (int) valueState.value());
+			}
+
+			backend.setCurrentKey(11);
+			Assert.assertEquals(null, valueState.value());
+		} finally {
+			if (null != backend) {
+				IOUtils.closeQuietly(backend);
+				backend.dispose();
+			}
+		}
+	}
+
+	@Test
+	public void testAsyncSnapshotCancellation() throws Exception {
+		OneShotLatch blocker = new OneShotLatch();
+		OneShotLatch waiter = new OneShotLatch();
+		BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024);
+		streamFactory.setWaiterLatch(waiter);
+		streamFactory.setBlockerLatch(blocker);
+		streamFactory.setAfterNumberInvocations(100);
+
+		AbstractKeyedStateBackend<Integer> backend = null;
+		try {
+			backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+			if (!backend.supportsAsynchronousSnapshots()) {
+				return;
+			}
+
+			InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+					VoidNamespaceSerializer.INSTANCE,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+			valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			for (int i = 0; i < 10; ++i) {
+				backend.setCurrentKey(i);
+				valueState.update(i);
+			}
+
+			RunnableFuture<KeyGroupsStateHandle> snapshot =
+					backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+
+			Thread runner = new Thread(snapshot);
+			runner.start();
+
+			// wait until the code reached some stream read
+			waiter.await();
+
+			// close the backend to see if the close is propagated to the stream
+			backend.close();
+
+			//unblock the stream so that it can run into the IOException
+			blocker.trigger();
+
+			//dispose the backend
+			backend.dispose();
+
+			runner.join();
+
+			try {
+				snapshot.get();
+				fail("Close was not propagated.");
+			} catch (ExecutionException ex) {
+				//ignore
+			}
+
+		} finally {
+			if (null != backend) {
+				IOUtils.closeQuietly(backend);
+				backend.dispose();
+			}
+		}
+	}
+
 	private static class AppendingFold implements FoldFunction<Integer, String> {
 		private static final long serialVersionUID = 1L;
 
@@ -1764,7 +1908,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			return KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer);
 		}
 	}
-	
+
 	/**
 	 * Returns the value by getting the serialized value and deserializing it
 	 * if it is not null.


[3/4] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
new file mode 100644
index 0000000..d63b6d3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -0,0 +1,1066 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.TreeSet;
+
+/**
+ * Implementation of Flink's in-memory state tables with copy-on-write support. This map does not support null values
+ * for key or namespace.
+ * <p>
+ * {@link CopyOnWriteStateTable} sacrifices some peak performance and memory efficiency for features like incremental
+ * rehashing and asynchronous snapshots through copy-on-write. Copy-on-write tries to minimize the amount of copying by
+ * maintaining version meta data for both, the map structure and the state objects. However, we must often proactively
+ * copy state objects when we hand them to the user.
+ * <p>
+ * As for any state backend, user should not keep references on state objects that they obtained from state backends
+ * outside the scope of the user function calls.
+ * <p>
+ * Some brief maintenance notes:
+ * <p>
+ * 1) Flattening the underlying data structure from nested maps (namespace) -> (key) -> (state) to one flat map
+ * (key, namespace) -> (state) brings certain performance trade-offs. In theory, the flat map has one less level of
+ * indirection compared to the nested map. However, the nested map naturally de-duplicates namespace objects for which
+ * #equals() is true. This leads to potentially a lot of redundant namespace objects for the flattened version. Those,
+ * in turn, can again introduce more cache misses because we need to follow the namespace object on all operations to
+ * ensure entry identities. Obviously, copy-on-write can also add memory overhead. So does the meta data to track
+ * copy-on-write requirement (state and entry versions on {@link StateTableEntry}).
+ * <p>
+ * 2) A flat map structure is a lot easier when it comes to tracking copy-on-write of the map structure.
+ * <p>
+ * 3) Nested structure had the (never used) advantage that we can easily drop and iterate whole namespaces. This could
+ * give locality advantages for certain access pattern, e.g. iterating a namespace.
+ * <p>
+ * 4) Serialization format is changed from namespace-prefix compressed (as naturally provided from the old nested
+ * structure) to making all entries self contained as (key, namespace, state).
+ * <p>
+ * 5) We got rid of having multiple nested tables, one for each key-group. Instead, we partition state into key-groups
+ * on-the-fly, during the asynchronous part of a snapshot.
+ * <p>
+ * 6) Currently, a state table can only grow, but never shrinks on low load. We could easily add this if required.
+ * <p>
+ * 7) Heap based state backends like this can easily cause a lot of GC activity. Besides using G1 as garbage collector,
+ * we should provide an additional state backend that operates on off-heap memory. This would sacrifice peak performance
+ * (due to de/serialization of objects) for a lower, but more constant throughput and potentially huge simplifications
+ * w.r.t. copy-on-write.
+ * <p>
+ * 8) We could try a hybrid of a serialized and object based backends, where key and namespace of the entries are both
+ * serialized in one byte-array.
+ * <p>
+ * 9) We could consider smaller types (e.g. short) for the version counting and think about some reset strategy before
+ * overflows, when there is no snapshot running. However, this would have to touch all entries in the map.
+ * <p>
+ * This class was initially based on the {@link java.util.HashMap} implementation of the Android JDK, but is now heavily
+ * customized towards the use case of table for state entries.
+ *
+ * IMPORTANT: the contracts for this class rely on the user not holding any references to objects returned by this map
+ * beyond the life cycle of per-element operations. Or phrased differently, all get-update-put operations on a mapping
+ * should be within one call of processElement. Otherwise, the user must take care of taking deep copies, e.g. for
+ * caching purposes.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of value.
+ */
+public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implements Iterable<StateEntry<K, N, S>> {
+
+	/**
+	 * The logger.
+	 */
+	private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
+
+	/**
+	 * Min capacity (other than zero) for a {@link CopyOnWriteStateTable}. Must be a power of two
+	 * greater than 1 (and less than 1 << 30).
+	 */
+	private static final int MINIMUM_CAPACITY = 4;
+
+	/**
+	 * Max capacity for a {@link CopyOnWriteStateTable}. Must be a power of two >= MINIMUM_CAPACITY.
+	 */
+	private static final int MAXIMUM_CAPACITY = 1 << 30;
+
+	/**
+	 * Minimum number of entries that one step of incremental rehashing migrates from the old to the new sub-table.
+	 */
+	private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
+
+	/**
+	 * An empty table shared by all zero-capacity maps (typically from default
+	 * constructor). It is never written to, and replaced on first put. Its size
+	 * is set to half the minimum, so that the first resize will create a
+	 * minimum-sized table.
+	 */
+	private static final StateTableEntry<?, ?, ?>[] EMPTY_TABLE = new StateTableEntry[MINIMUM_CAPACITY >>> 1];
+
+	/**
+	 * Empty entry that we use to bootstrap our {@link CopyOnWriteStateTable.StateEntryIterator}.
+	 */
+	private static final StateTableEntry<?, ?, ?> ITERATOR_BOOTSTRAP_ENTRY = new StateTableEntry<>();
+
+	/**
+	 * Maintains an ordered set of version ids that are still in use by unreleased snapshots.
+	 */
+	private final TreeSet<Integer> snapshotVersions;
+
+	/**
+	 * This is the primary entry array (hash directory) of the state table. If no incremental rehash is ongoing, this
+	 * is the only used table.
+	 **/
+	private StateTableEntry<K, N, S>[] primaryTable;
+
+	/**
+	 * We maintain a secondary entry array while performing an incremental rehash. The purpose is to slowly migrate
+	 * entries from the primary table to this resized table array. When all entries are migrated, this becomes the new
+	 * primary table.
+	 */
+	private StateTableEntry<K, N, S>[] incrementalRehashTable;
+
+	/**
+	 * The current number of mappings in the primary table.
+	 */
+	private int primaryTableSize;
+
+	/**
+	 * The current number of mappings in the rehash table.
+	 */
+	private int incrementalRehashTableSize;
+
+	/**
+	 * The next index for a step of incremental rehashing in the primary table.
+	 */
+	private int rehashIndex;
+
+	/**
+	 * The current version of this map. Used for copy-on-write mechanics.
+	 */
+	private int stateTableVersion;
+
+	/**
+	 * The highest version of this map that is still required by any unreleased snapshot.
+	 */
+	private int highestRequiredSnapshotVersion;
+
+	/**
+	 * The last namespace that was actually inserted. This is a small optimization to reduce duplicate namespace objects.
+	 */
+	private N lastNamespace;
+
+	/**
+	 * The {@link CopyOnWriteStateTable} is rehashed when its size exceeds this threshold.
+	 * The value of this field is generally .75 * capacity, except when
+	 * the capacity is zero, as described in the EMPTY_TABLE declaration
+	 * above.
+	 */
+	private int threshold;
+
+	/**
+	 * Incremented by "structural modifications" to allow (best effort)
+	 * detection of concurrent modification.
+	 */
+	private int modCount;
+
+	/**
+	 * Constructs a new {@code StateTable} with default capacity of 1024.
+	 *
+	 * @param keyContext the key context.
+	 * @param metaInfo   the meta information, including the type serializer for state copy-on-write.
+	 */
+	CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+		this(keyContext, metaInfo, 1024);
+	}
+
+	/**
+	 * Constructs a new {@code StateTable} instance with the specified capacity.
+	 *
+	 * @param keyContext the key context.
+	 * @param metaInfo   the meta information, including the type serializer for state copy-on-write.
+	 * @param capacity   the initial capacity of this hash map.
+	 * @throws IllegalArgumentException when the capacity is less than zero.
+	 */
+	@SuppressWarnings("unchecked")
+	private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+		super(keyContext, metaInfo);
+
+		// initialized tables to EMPTY_TABLE.
+		this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+		this.incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+
+		// initialize sizes to 0.
+		this.primaryTableSize = 0;
+		this.incrementalRehashTableSize = 0;
+
+		this.rehashIndex = 0;
+		this.stateTableVersion = 0;
+		this.highestRequiredSnapshotVersion = 0;
+		this.snapshotVersions = new TreeSet<>();
+
+		if (capacity < 0) {
+			throw new IllegalArgumentException("Capacity: " + capacity);
+		}
+
+		if (capacity == 0) {
+			threshold = -1;
+			return;
+		}
+
+		if (capacity < MINIMUM_CAPACITY) {
+			capacity = MINIMUM_CAPACITY;
+		} else if (capacity > MAXIMUM_CAPACITY) {
+			capacity = MAXIMUM_CAPACITY;
+		} else {
+			capacity = MathUtils.roundUpToPowerOfTwo(capacity);
+		}
+		primaryTable = makeTable(capacity);
+	}
+
+	// Public API from AbstractStateTable ------------------------------------------------------------------------------
+
+	/**
+	 * Returns the total number of entries in this {@link CopyOnWriteStateTable}. This is the sum of both sub-tables.
+	 *
+	 * @return the number of entries in this {@link CopyOnWriteStateTable}.
+	 */
+	@Override
+	public int size() {
+		return primaryTableSize + incrementalRehashTableSize;
+	}
+
+	@Override
+	public S get(K key, N namespace) {
+
+		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+		final int requiredVersion = highestRequiredSnapshotVersion;
+		final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+		int index = hash & (tab.length - 1);
+
+		for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+			final K eKey = e.key;
+			final N eNamespace = e.namespace;
+			if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
+
+				// copy-on-write check for state
+				if (e.stateVersion < requiredVersion) {
+					// copy-on-write check for entry
+					if (e.entryVersion < requiredVersion) {
+						e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
+					}
+					e.stateVersion = stateTableVersion;
+					e.state = getStateSerializer().copy(e.state);
+				}
+
+				return e.state;
+			}
+		}
+
+		return null;
+	}
+
+	@Override
+	public void put(K key, int keyGroup, N namespace, S state) {
+		put(key, namespace, state);
+	}
+
+	@Override
+	public S get(N namespace) {
+		return get(keyContext.getCurrentKey(), namespace);
+	}
+
+	@Override
+	public boolean containsKey(N namespace) {
+		return containsKey(keyContext.getCurrentKey(), namespace);
+	}
+
+	@Override
+	public void put(N namespace, S state) {
+		put(keyContext.getCurrentKey(), namespace, state);
+	}
+
+	@Override
+	public S putAndGetOld(N namespace, S state) {
+		return putAndGetOld(keyContext.getCurrentKey(), namespace, state);
+	}
+
+	@Override
+	public void remove(N namespace) {
+		remove(keyContext.getCurrentKey(), namespace);
+	}
+
+	@Override
+	public S removeAndGetOld(N namespace) {
+		return removeAndGetOld(keyContext.getCurrentKey(), namespace);
+	}
+
+	@Override
+	public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
+		transform(keyContext.getCurrentKey(), namespace, value, transformation);
+	}
+
+	// Private implementation details of the API methods ---------------------------------------------------------------
+
+	/**
+	 * Returns whether this table contains the specified key/namespace composite key.
+	 *
+	 * @param key       the key in the composite key to search for. Not null.
+	 * @param namespace the namespace in the composite key to search for. Not null.
+	 * @return {@code true} if this map contains the specified key/namespace composite key,
+	 * {@code false} otherwise.
+	 */
+	boolean containsKey(K key, N namespace) {
+
+		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+		final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+		int index = hash & (tab.length - 1);
+
+		for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+			final K eKey = e.key;
+			final N eNamespace = e.namespace;
+
+			if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * Maps the specified key/namespace composite key to the specified value. This method should be preferred
+	 * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)} when the caller is not interested
+	 * in the old value, because this can potentially reduce copy-on-write activity.
+	 *
+	 * @param key       the key. Not null.
+	 * @param namespace the namespace. Not null.
+	 * @param value     the value. Can be null.
+	 */
+	void put(K key, N namespace, S value) {
+		final StateTableEntry<K, N, S> e = putEntry(key, namespace);
+
+		e.state = value;
+		e.stateVersion = stateTableVersion;
+	}
+
+	/**
+	 * Maps the specified key/namespace composite key to the specified value. Returns the previous state that was
+	 * registered under the composite key.
+	 *
+	 * @param key       the key. Not null.
+	 * @param namespace the namespace. Not null.
+	 * @param value     the value. Can be null.
+	 * @return the value of any previous mapping with the specified key or
+	 * {@code null} if there was no such mapping.
+	 */
+	S putAndGetOld(K key, N namespace, S value) {
+
+		final StateTableEntry<K, N, S> e = putEntry(key, namespace);
+
+		// copy-on-write check for state
+		S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ?
+				getStateSerializer().copy(e.state) :
+				e.state;
+
+		e.state = value;
+		e.stateVersion = stateTableVersion;
+
+		return oldState;
+	}
+
+	/**
+	 * Removes the mapping with the specified key/namespace composite key from this map. This method should be preferred
+	 * over {@link #removeAndGetOld(Object, Object)} when the caller is not interested in the old value, because this
+	 * can potentially reduce copy-on-write activity.
+	 *
+	 * @param key       the key of the mapping to remove. Not null.
+	 * @param namespace the namespace of the mapping to remove. Not null.
+	 */
+	void remove(K key, N namespace) {
+		removeEntry(key, namespace);
+	}
+
+	/**
+	 * Removes the mapping with the specified key/namespace composite key from this map, returning the state that was
+	 * found under the entry.
+	 *
+	 * @param key       the key of the mapping to remove. Not null.
+	 * @param namespace the namespace of the mapping to remove. Not null.
+	 * @return the value of the removed mapping or {@code null} if no mapping
+	 * for the specified key was found.
+	 */
+	S removeAndGetOld(K key, N namespace) {
+
+		final StateTableEntry<K, N, S> e = removeEntry(key, namespace);
+
+		return e != null ?
+				// copy-on-write check for state
+				(e.stateVersion < highestRequiredSnapshotVersion ?
+						getStateSerializer().copy(e.state) :
+						e.state) :
+				null;
+	}
+
+	/**
+	 * @param key            the key of the mapping to remove. Not null.
+	 * @param namespace      the namespace of the mapping to remove. Not null.
+	 * @param value          the value that is the second input for the transformation.
+	 * @param transformation the transformation function to apply on the old state and the given value.
+	 * @param <T>            type of the value that is the second input to the {@link StateTransformationFunction}.
+	 * @throws Exception exception that happen on applying the function.
+	 * @see #transform(Object, Object, StateTransformationFunction).
+	 */
+	<T> void transform(
+			K key,
+			N namespace,
+			T value,
+			StateTransformationFunction<S, T> transformation) throws Exception {
+
+		final StateTableEntry<K, N, S> entry = putEntry(key, namespace);
+
+		// copy-on-write check for state
+		entry.state = transformation.apply(
+				(entry.stateVersion < highestRequiredSnapshotVersion) ?
+						getStateSerializer().copy(entry.state) :
+						entry.state,
+				value);
+		entry.stateVersion = stateTableVersion;
+	}
+
+	/**
+	 * Helper method that is the basis for operations that add mappings.
+	 */
+	private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
+
+		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+		final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+		int index = hash & (tab.length - 1);
+
+		for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+			if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
+
+				// copy-on-write check for entry
+				if (e.entryVersion < highestRequiredSnapshotVersion) {
+					e = handleChainedEntryCopyOnWrite(tab, index, e);
+				}
+
+				return e;
+			}
+		}
+
+		++modCount;
+		if (size() > threshold) {
+			doubleCapacity();
+		}
+
+		return addNewStateTableEntry(tab, key, namespace, hash);
+	}
+
+	/**
+	 * Helper method that is the basis for operations that remove mappings.
+	 */
+	private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
+
+		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+		final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+		int index = hash & (tab.length - 1);
+
+		for (StateTableEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {
+			if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
+				if (prev == null) {
+					tab[index] = e.next;
+				} else {
+					// copy-on-write check for entry
+					if (prev.entryVersion < highestRequiredSnapshotVersion) {
+						prev = handleChainedEntryCopyOnWrite(tab, index, prev);
+					}
+					prev.next = e.next;
+				}
+				++modCount;
+				if (tab == primaryTable) {
+					--primaryTableSize;
+				} else {
+					--incrementalRehashTableSize;
+				}
+				return e;
+			}
+		}
+		return null;
+	}
+
+	private void checkKeyNamespacePreconditions(K key, N namespace) {
+		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
+		Preconditions.checkNotNull(namespace, "Provided namespace is null.");
+	}
+
+	// Meta data setter / getter and toString --------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializer<S> getStateSerializer() {
+		return metaInfo.getStateSerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return metaInfo.getNamespaceSerializer();
+	}
+
+	@Override
+	public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+		return metaInfo;
+	}
+
+	@Override
+	public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+		this.metaInfo = metaInfo;
+	}
+
+	// Iteration  ------------------------------------------------------------------------------------------------------
+
+	@Override
+	public Iterator<StateEntry<K, N, S>> iterator() {
+		return new StateEntryIterator();
+	}
+
+	// Private utility functions for StateTable management -------------------------------------------------------------
+
+	/**
+	 * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot)
+	 */
+	@VisibleForTesting
+	void releaseSnapshot(int snapshotVersion) {
+		// we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release.
+		// Only stale reads of from the result of #releaseSnapshot calls are ok.
+		synchronized (snapshotVersions) {
+			Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to release unknown snapshot version");
+			highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ? 0 : snapshotVersions.last();
+		}
+	}
+
+	/**
+	 * Creates (combined) copy of the table arrays for a snapshot. This method must be called by the same Thread that
+	 * does modifications to the {@link CopyOnWriteStateTable}.
+	 */
+	@VisibleForTesting
+	@SuppressWarnings("unchecked")
+	StateTableEntry<K, N, S>[] snapshotTableArrays() {
+
+		// we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release.
+		// Only stale reads of from the result of #releaseSnapshot calls are ok. This is why we must call this method
+		// from the same thread that does all the modifications to the table.
+		synchronized (snapshotVersions) {
+
+			// increase the table version for copy-on-write and register the snapshot
+			if (++stateTableVersion < 0) {
+				// this is just a safety net against overflows, but should never happen in practice (i.e., only after 2^31 snapshots)
+				throw new IllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart.");
+			}
+
+			highestRequiredSnapshotVersion = stateTableVersion;
+			snapshotVersions.add(highestRequiredSnapshotVersion);
+		}
+
+		StateTableEntry<K, N, S>[] table = primaryTable;
+		if (isRehashing()) {
+			// consider both tables for the snapshot, the rehash index tells us which part of the two tables we need
+			final int localRehashIndex = rehashIndex;
+			final int localCopyLength = table.length - localRehashIndex;
+			StateTableEntry<K, N, S>[] copy = new StateTableEntry[localRehashIndex + table.length];
+			// for the primary table, take every index >= rhIdx.
+			System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
+
+			// for the new table, we are sure that two regions contain all the entries:
+			// [0, rhIdx[ AND [table.length / 2, table.length / 2 + rhIdx[
+			table = incrementalRehashTable;
+			System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
+			System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
+
+			return copy;
+		} else {
+			// we only need to copy the primary table
+			return Arrays.copyOf(table, table.length);
+		}
+	}
+
+	/**
+	 * Allocate a table of the given capacity and set the threshold accordingly.
+	 *
+	 * @param newCapacity must be a power of two
+	 */
+	private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
+
+		if (MAXIMUM_CAPACITY == newCapacity) {
+			LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can lead " +
+					"to more collisions and lower performance. Please consider scaling-out your job or using a " +
+					"different keyed state backend implementation!");
+		}
+
+		threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity
+		@SuppressWarnings("unchecked") StateTableEntry<K, N, S>[] newTable
+				= (StateTableEntry<K, N, S>[]) new StateTableEntry[newCapacity];
+		return newTable;
+	}
+
+	/**
+	 * Creates and inserts a new {@link StateTableEntry}.
+	 */
+	private StateTableEntry<K, N, S> addNewStateTableEntry(
+			StateTableEntry<K, N, S>[] table,
+			K key,
+			N namespace,
+			int hash) {
+
+		// small optimization that aims to avoid holding references on duplicate namespace objects
+		if (namespace.equals(lastNamespace)) {
+			namespace = lastNamespace;
+		} else {
+			lastNamespace = namespace;
+		}
+
+		int index = hash & (table.length - 1);
+		StateTableEntry<K, N, S> newEntry = new StateTableEntry<>(
+				key,
+				namespace,
+				null,
+				hash,
+				table[index],
+				stateTableVersion,
+				stateTableVersion);
+		table[index] = newEntry;
+
+		if (table == primaryTable) {
+			++primaryTableSize;
+		} else {
+			++incrementalRehashTableSize;
+		}
+		return newEntry;
+	}
+
+	/**
+	 * Select the sub-table which is responsible for entries with the given hash code.
+	 *
+	 * @param hashCode the hash code which we use to decide about the table that is responsible.
+	 * @return the index of the sub-table that is responsible for the entry with the given hash code.
+	 */
+	private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) {
+		return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? primaryTable : incrementalRehashTable;
+	}
+
+	/**
+	 * Doubles the capacity of the hash table. Existing entries are placed in
+	 * the correct bucket on the enlarged table. If the current capacity is,
+	 * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which
+	 * will be new unless we were already at MAXIMUM_CAPACITY.
+	 */
+	private void doubleCapacity() {
+
+		// There can only be one rehash in flight. From the amount of incremental rehash steps we take, this should always hold.
+		Preconditions.checkState(!isRehashing(), "There is already a rehash in progress.");
+
+		StateTableEntry<K, N, S>[] oldTable = primaryTable;
+
+		int oldCapacity = oldTable.length;
+
+		if (oldCapacity == MAXIMUM_CAPACITY) {
+			return;
+		}
+
+		incrementalRehashTable = makeTable(oldCapacity * 2);
+	}
+
+	/**
+	 * Returns true, if an incremental rehash is in progress.
+	 */
+	@VisibleForTesting
+	boolean isRehashing() {
+		// if we rehash, the secondary table is not empty
+		return EMPTY_TABLE != incrementalRehashTable;
+	}
+
+	/**
+	 * Computes the hash for the composite of key and namespace and performs some steps of incremental rehash if
+	 * incremental rehashing is in progress.
+	 */
+	private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {
+
+		checkKeyNamespacePreconditions(key, namespace);
+
+		if (isRehashing()) {
+			incrementalRehash();
+		}
+
+		return compositeHash(key, namespace);
+	}
+
+	/**
+	 * Runs a number of steps for incremental rehashing.
+	 */
+	@SuppressWarnings("unchecked")
+	private void incrementalRehash() {
+
+		StateTableEntry<K, N, S>[] oldTable = primaryTable;
+		StateTableEntry<K, N, S>[] newTable = incrementalRehashTable;
+
+		int oldCapacity = oldTable.length;
+		int newMask = newTable.length - 1;
+		int requiredVersion = highestRequiredSnapshotVersion;
+		int rhIdx = rehashIndex;
+		int transferred = 0;
+
+		// we migrate a certain minimum amount of entries from the old to the new table
+		while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
+
+			StateTableEntry<K, N, S> e = oldTable[rhIdx];
+
+			while (e != null) {
+				// copy-on-write check for entry
+				if (e.entryVersion < requiredVersion) {
+					e = new StateTableEntry<>(e, stateTableVersion);
+				}
+				StateTableEntry<K, N, S> n = e.next;
+				int pos = e.hash & newMask;
+				e.next = newTable[pos];
+				newTable[pos] = e;
+				e = n;
+				++transferred;
+			}
+
+			oldTable[rhIdx] = null;
+			if (++rhIdx == oldCapacity) {
+				//here, the rehash is complete and we release resources and reset fields
+				primaryTable = newTable;
+				incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+				primaryTableSize += incrementalRehashTableSize;
+				incrementalRehashTableSize = 0;
+				rehashIndex = 0;
+				return;
+			}
+		}
+
+		// sync our local bookkeeping the with official bookkeeping fields
+		primaryTableSize -= transferred;
+		incrementalRehashTableSize += transferred;
+		rehashIndex = rhIdx;
+	}
+
+	/**
+	 * Perform copy-on-write for entry chains. We iterate the (hopefully and probably) still cached chain, replace
+	 * all links up to the 'untilEntry', which we actually wanted to modify.
+	 */
+	private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(
+			StateTableEntry<K, N, S>[] tab,
+			int tableIdx,
+			StateTableEntry<K, N, S> untilEntry) {
+
+		final int required = highestRequiredSnapshotVersion;
+
+		StateTableEntry<K, N, S> current = tab[tableIdx];
+		StateTableEntry<K, N, S> copy;
+
+		if (current.entryVersion < required) {
+			copy = new StateTableEntry<>(current, stateTableVersion);
+			tab[tableIdx] = copy;
+		} else {
+			// nothing to do, just advance copy to current
+			copy = current;
+		}
+
+		// we iterate the chain up to 'until entry'
+		while (current != untilEntry) {
+
+			//advance current
+			current = current.next;
+
+			if (current.entryVersion < required) {
+				// copy and advance the current's copy
+				copy.next = new StateTableEntry<>(current, stateTableVersion);
+				copy = copy.next;
+			} else {
+				// nothing to do, just advance copy to current
+				copy = current;
+			}
+		}
+
+		return copy;
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
+		return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY;
+	}
+
+	/**
+	 * Helper function that creates and scrambles a composite hash for key and namespace.
+	 */
+	private static int compositeHash(Object key, Object namespace) {
+		// create composite key through XOR, then apply some bit-mixing for better distribution of skewed keys.
+		return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode());
+	}
+
+	// Snapshotting ----------------------------------------------------------------------------------------------------
+
+	int getStateTableVersion() {
+		return stateTableVersion;
+	}
+
+	/**
+	 * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be written in checkpointing. The snapshot integrity
+	 * is protected through copy-on-write from the {@link CopyOnWriteStateTable}. Users should call
+	 * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using the returned object.
+	 *
+	 * @return a snapshot from this {@link CopyOnWriteStateTable}, for checkpointing.
+	 */
+	@Override
+	public CopyOnWriteStateTableSnapshot<K, N, S> createSnapshot() {
+		return new CopyOnWriteStateTableSnapshot<>(this);
+	}
+
+	/**
+	 * Releases a snapshot for this {@link CopyOnWriteStateTable}. This method should be called once a snapshot is no more needed,
+	 * so that the {@link CopyOnWriteStateTable} can stop considering this snapshot for copy-on-write, thus avoiding unnecessary
+	 * object creation.
+	 *
+	 * @param snapshotToRelease the snapshot to release, which was previously created by this state table.
+	 */
+	void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> snapshotToRelease) {
+
+		Preconditions.checkArgument(snapshotToRelease.isOwner(this),
+				"Cannot release snapshot which is owned by a different state table.");
+
+		releaseSnapshot(snapshotToRelease.getSnapshotVersion());
+	}
+
+	// StateTableEntry -------------------------------------------------------------------------------------------------
+
+	/**
+	 * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of key, namespace, and state. Thereby, key and
+	 * namespace together serve as a composite key for the state. This class also contains some management meta data for
+	 * copy-on-write, a pointer to link other {@link StateTableEntry}s to a list, and cached hash code.
+	 *
+	 * @param <K> type of key.
+	 * @param <N> type of namespace.
+	 * @param <S> type of state.
+	 */
+	static class StateTableEntry<K, N, S> implements StateEntry<K, N, S> {
+
+		/**
+		 * The key. Assumed to be immutable and not null.
+		 */
+		final K key;
+
+		/**
+		 * The namespace. Assumed to be immutable and not null.
+		 */
+		final N namespace;
+
+		/**
+		 * The state. This is not final to allow exchanging the object for copy-on-write. Can be null.
+		 */
+		S state;
+
+		/**
+		 * Link to another {@link StateTableEntry}. This is used to resolve collisions in the
+		 * {@link CopyOnWriteStateTable} through chaining.
+		 */
+		StateTableEntry<K, N, S> next;
+
+		/**
+		 * The version of this {@link StateTableEntry}. This is meta data for copy-on-write of the table structure.
+		 */
+		int entryVersion;
+
+		/**
+		 * The version of the state object in this entry. This is meta data for copy-on-write of the state object itself.
+		 */
+		int stateVersion;
+
+		/**
+		 * The computed secondary hash for the composite of key and namespace.
+		 */
+		final int hash;
+
+		StateTableEntry() {
+			this(null, null, null, 0, null, 0, 0);
+		}
+
+		StateTableEntry(StateTableEntry<K, N, S> other, int entryVersion) {
+			this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion);
+		}
+
+		StateTableEntry(
+				K key,
+				N namespace,
+				S state,
+				int hash,
+				StateTableEntry<K, N, S> next,
+				int entryVersion,
+				int stateVersion) {
+			this.key = key;
+			this.namespace = namespace;
+			this.hash = hash;
+			this.next = next;
+			this.entryVersion = entryVersion;
+			this.state = state;
+			this.stateVersion = stateVersion;
+		}
+
+		public final void setState(S value, int mapVersion) {
+			// naturally, we can update the state version every time we replace the old state with a different object
+			if (value != state) {
+				this.state = value;
+				this.stateVersion = mapVersion;
+			}
+		}
+
+		@Override
+		public K getKey() {
+			return key;
+		}
+
+		@Override
+		public N getNamespace() {
+			return namespace;
+		}
+
+		@Override
+		public S getState() {
+			return state;
+		}
+
+		@Override
+		public final boolean equals(Object o) {
+			if (!(o instanceof CopyOnWriteStateTable.StateTableEntry)) {
+				return false;
+			}
+
+			StateEntry<?, ?, ?> e = (StateEntry<?, ?, ?>) o;
+			return e.getKey().equals(key)
+					&& e.getNamespace().equals(namespace)
+					&& Objects.equals(e.getState(), state);
+		}
+
+		@Override
+		public final int hashCode() {
+			return (key.hashCode() ^ namespace.hashCode()) ^ Objects.hashCode(state);
+		}
+
+		@Override
+		public final String toString() {
+			return "(" + key + "|" + namespace + ")=" + state;
+		}
+	}
+
+	// For testing  ----------------------------------------------------------------------------------------------------
+
+	@Override
+	public int sizeOfNamespace(Object namespace) {
+		int count = 0;
+		for (StateEntry<K, N, S> entry : this) {
+			if (null != entry && namespace.equals(entry.getNamespace())) {
+				++count;
+			}
+		}
+		return count;
+	}
+
+
+	// StateEntryIterator  ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Iterator over the entries in a {@link CopyOnWriteStateTable}.
+	 */
+	class StateEntryIterator implements Iterator<StateEntry<K, N, S>> {
+		private StateTableEntry<K, N, S>[] activeTable;
+		private int nextTablePosition;
+		private StateTableEntry<K, N, S> nextEntry;
+		private int expectedModCount = modCount;
+
+		StateEntryIterator() {
+			this.activeTable = primaryTable;
+			this.nextTablePosition = 0;
+			this.expectedModCount = modCount;
+			this.nextEntry = getBootstrapEntry();
+			advanceIterator();
+		}
+
+		private StateTableEntry<K, N, S> advanceIterator() {
+
+			StateTableEntry<K, N, S> entryToReturn = nextEntry;
+			StateTableEntry<K, N, S> next = entryToReturn.next;
+
+			// consider both sub-tables tables to cover the case of rehash
+			while (next == null) {
+
+				StateTableEntry<K, N, S>[] tab = activeTable;
+
+				while (nextTablePosition < tab.length) {
+					next = tab[nextTablePosition++];
+
+					if (next != null) {
+						nextEntry = next;
+						return entryToReturn;
+					}
+				}
+
+				if (activeTable == incrementalRehashTable) {
+					break;
+				}
+
+				activeTable = incrementalRehashTable;
+				nextTablePosition = 0;
+			}
+
+			nextEntry = next;
+			return entryToReturn;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return nextEntry != null;
+		}
+
+		@Override
+		public StateTableEntry<K, N, S> next() {
+			if (modCount != expectedModCount) {
+				throw new ConcurrentModificationException();
+			}
+
+			if (nextEntry == null) {
+				throw new NoSuchElementException();
+			}
+
+			return advanceIterator();
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Read-only iterator");
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
new file mode 100644
index 0000000..c83fce0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import java.io.IOException;
+
+/**
+ * This class represents the snapshot of a {@link CopyOnWriteStateTable} and has a role in operator state checkpointing. Besides
+ * holding the {@link CopyOnWriteStateTable}s internal entries at the time of the snapshot, this class is also responsible for
+ * preparing and writing the state in the process of checkpointing.
+ * <p>
+ * IMPORTANT: Please notice that snapshot integrity of entries in this class rely on proper copy-on-write semantics
+ * through the {@link CopyOnWriteStateTable} that created the snapshot object, but all objects in this snapshot must be considered
+ * as READ-ONLY!. The reason is that the objects held by this class may or may not be deep copies of original objects
+ * that may still used in the {@link CopyOnWriteStateTable}. This depends for each entry on whether or not it was subject to
+ * copy-on-write operations by the {@link CopyOnWriteStateTable}. Phrased differently: the {@link CopyOnWriteStateTable} provides
+ * copy-on-write isolation for this snapshot, but this snapshot does not isolate modifications from the
+ * {@link CopyOnWriteStateTable}!
+ *
+ * @param <K> type of key
+ * @param <N> type of namespace
+ * @param <S> type of state
+ */
+@Internal
+public class CopyOnWriteStateTableSnapshot<K, N, S>
+		extends AbstractStateTableSnapshot<K, N, S, CopyOnWriteStateTable<K, N, S>> {
+
+	/**
+	 * Version of the {@link CopyOnWriteStateTable} when this snapshot was created. This can be used to release the snapshot.
+	 */
+	private final int snapshotVersion;
+
+	/**
+	 * The number of entries in the {@link CopyOnWriteStateTable} at the time of creating this snapshot.
+	 */
+	private final int stateTableSize;
+
+	/**
+	 * The state table entries, as by the time this snapshot was created. Objects in this array may or may not be deep
+	 * copies of the current entries in the {@link CopyOnWriteStateTable} that created this snapshot. This depends for each entry
+	 * on whether or not it was subject to copy-on-write operations by the {@link CopyOnWriteStateTable}.
+	 */
+	private final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshotData;
+
+	/**
+	 * Offsets for the individual key-groups. This is lazily created when the snapshot is grouped by key-group during
+	 * the process of writing this snapshot to an output as part of checkpointing.
+	 */
+	private int[] keyGroupOffsets;
+
+	/**
+	 * Creates a new {@link CopyOnWriteStateTableSnapshot}.
+	 *
+	 * @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents a snapshot.
+	 */
+	CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {
+
+		super(owningStateTable);
+		this.snapshotData = owningStateTable.snapshotTableArrays();
+		this.snapshotVersion = owningStateTable.getStateTableVersion();
+		this.stateTableSize = owningStateTable.size();
+		this.keyGroupOffsets = null;
+	}
+
+	/**
+	 * Returns the internal version of the {@link CopyOnWriteStateTable} when this snapshot was created. This value must be used to
+	 * tell the {@link CopyOnWriteStateTable} when to release this snapshot.
+	 */
+	int getSnapshotVersion() {
+		return snapshotVersion;
+	}
+
+	/**
+	 * Partitions the snapshot data by key-group. The algorithm first builds a histogram for the distribution of keys
+	 * into key-groups. Then, the histogram is accumulated to obtain the boundaries of each key-group in an array.
+	 * Last, we use the accumulated counts as write position pointers for the key-group's bins when reordering the
+	 * entries by key-group. This operation is lazily performed before the first writing of a key-group.
+	 * <p>
+	 * As a possible future optimization, we could perform the repartitioning in-place, using a scheme similar to the
+	 * cuckoo cycles in cuckoo hashing. This can trade some performance for a smaller memory footprint.
+	 */
+	@SuppressWarnings("unchecked")
+	private void partitionEntriesByKeyGroup() {
+
+		// We only have to perform this step once before the first key-group is written
+		if (null != keyGroupOffsets) {
+			return;
+		}
+
+		final KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange();
+		final int totalKeyGroups = owningStateTable.keyContext.getNumberOfKeyGroups();
+		final int baseKgIdx = keyGroupRange.getStartKeyGroup();
+		final int[] histogram = new int[keyGroupRange.getNumberOfKeyGroups() + 1];
+
+		CopyOnWriteStateTable.StateTableEntry<K, N, S>[] unfold = new CopyOnWriteStateTable.StateTableEntry[stateTableSize];
+
+		// 1) In this step we i) 'unfold' the linked list of entries to a flat array and ii) build a histogram for key-groups
+		int unfoldIndex = 0;
+		for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : snapshotData) {
+			while (null != entry) {
+				int effectiveKgIdx =
+						KeyGroupRangeAssignment.computeKeyGroupForKeyHash(entry.key.hashCode(), totalKeyGroups) - baseKgIdx + 1;
+				++histogram[effectiveKgIdx];
+				unfold[unfoldIndex++] = entry;
+				entry = entry.next;
+			}
+		}
+
+		// 2) We accumulate the histogram bins to obtain key-group ranges in the final array
+		for (int i = 1; i < histogram.length; ++i) {
+			histogram[i] += histogram[i - 1];
+		}
+
+		// 3) We repartition the entries by key-group, using the histogram values as write indexes
+		for (CopyOnWriteStateTable.StateTableEntry<K, N, S> t : unfold) {
+			int effectiveKgIdx =
+					KeyGroupRangeAssignment.computeKeyGroupForKeyHash(t.key.hashCode(), totalKeyGroups) - baseKgIdx;
+			snapshotData[histogram[effectiveKgIdx]++] = t;
+		}
+
+		// 4) As byproduct, we also created the key-group offsets
+		this.keyGroupOffsets = histogram;
+	}
+
+	@Override
+	public void release() {
+		owningStateTable.releaseSnapshot(this);
+	}
+
+	@Override
+	public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
+
+		if (null == keyGroupOffsets) {
+			partitionEntriesByKeyGroup();
+		}
+
+		final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] groupedOut = snapshotData;
+		KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange();
+		int keyGroupOffsetIdx = keyGroupId - keyGroupRange.getStartKeyGroup() - 1;
+		int startOffset = keyGroupOffsetIdx < 0 ? 0 : keyGroupOffsets[keyGroupOffsetIdx];
+		int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1];
+
+		TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();
+		TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer();
+		TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer();
+
+		// write number of mappings in key-group
+		dov.writeInt(endOffset - startOffset);
+
+		// write mappings
+		for (int i = startOffset; i < endOffset; ++i) {
+			CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite = groupedOut[i];
+			groupedOut[i] = null; // free asap for GC
+			namespaceSerializer.serialize(toWrite.namespace, dov);
+			keySerializer.serialize(toWrite.key, dov);
+			stateSerializer.serialize(toWrite.state, dov);
+		}
+	}
+
+	/**
+	 * Returns true iff the given state table is the owner of this snapshot object.
+	 */
+	boolean isOwner(CopyOnWriteStateTable<K, N, S> stateTable) {
+		return stateTable == owningStateTable;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 624b83e..64fc1db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -23,18 +23,16 @@ import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Heap-backed partitioned {@link ReducingState} that is
  * snapshotted into files.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <IN> The type of the value added to the state.
@@ -45,13 +43,11 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 		extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
 		implements InternalAggregatingState<N, IN, OUT> {
 
-	private final AggregateFunction<IN, ACC, OUT> aggFunction;
+	private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend
-	 *             The state backend backing that created this state.
 	 * @param stateDesc
 	 *             The state identifier for the state. This contains name and can create a default state value.
 	 * @param stateTable
@@ -60,14 +56,13 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 	 *             The serializer for the type that indicates the namespace
 	 */
 	public HeapAggregatingState(
-			KeyedStateBackend<K> backend,
 			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc,
 			StateTable<K, N, ACC> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 
-		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
-		this.aggFunction = stateDesc.getAggregateFunction();
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction());
 	}
 
 	// ------------------------------------------------------------------------
@@ -76,64 +71,25 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 
 	@Override
 	public OUT get() {
-		final K key = backend.getCurrentKey();
-
-		checkState(currentNamespace != null, "No namespace set.");
-		checkState(key != null, "No key set.");
-
-		Map<N, Map<K, ACC>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			return null;
-		}
 
-		Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return null;
-		}
-
-		ACC accumulator = keyedMap.get(key);
-		return aggFunction.getResult(accumulator);
+		ACC accumulator = stateTable.get(currentNamespace);
+		return accumulator != null ? aggregateTransformation.aggFunction.getResult(accumulator) : null;
 	}
 
 	@Override
 	public void add(IN value) throws IOException {
-		final K key = backend.getCurrentKey();
-
-		checkState(currentNamespace != null, "No namespace set.");
-		checkState(key != null, "No key set.");
+		final N namespace = currentNamespace;
 
 		if (value == null) {
 			clear();
 			return;
 		}
 
-		Map<N, Map<K, ACC>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			namespaceMap = createNewMap();
-			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
-		}
-
-		Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			keyedMap = createNewMap();
-			namespaceMap.put(currentNamespace, keyedMap);
-		}
-
-		// if this is the first value for the key, create a new accumulator
-		ACC accumulator = keyedMap.get(key);
-		if (accumulator == null) {
-			accumulator = aggFunction.createAccumulator();
-			keyedMap.put(key, accumulator);
+		try {
+			stateTable.transform(namespace, value, aggregateTransformation);
+		} catch (Exception e) {
+			throw new IOException("Exception while applying AggregateFunction in aggregating state", e);
 		}
-
-		// 
-		aggFunction.add(value, accumulator);
 	}
 
 	// ------------------------------------------------------------------------
@@ -142,6 +98,24 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 
 	@Override
 	protected ACC mergeState(ACC a, ACC b) throws Exception {
-		return aggFunction.merge(a, b);
+		return aggregateTransformation.aggFunction.merge(a, b);
+	}
+
+	static final class AggregateTransformation<IN, ACC, OUT> implements StateTransformationFunction<ACC, IN> {
+
+		private final AggregateFunction<IN, ACC, OUT> aggFunction;
+
+		AggregateTransformation(AggregateFunction<IN, ACC, OUT> aggFunction) {
+			this.aggFunction = Preconditions.checkNotNull(aggFunction);
+		}
+
+		@Override
+		public ACC apply(ACC accumulator, IN value) throws Exception {
+			if (accumulator == null) {
+				accumulator = aggFunction.createAccumulator();
+			}
+			aggFunction.add(value, accumulator);
+			return accumulator;
+		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index 6df3f5d..dad6d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -22,12 +22,11 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.util.Map;
 
 /**
  * Heap-backed partitioned {@link FoldingState} that is
@@ -43,24 +42,22 @@ public class HeapFoldingState<K, N, T, ACC>
 		implements InternalFoldingState<N, T, ACC> {
 
 	/** The function used to fold the state */
-	private final FoldFunction<T, ACC> foldFunction;
+	private final FoldTransformation<T, ACC> foldTransformation;
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
 	 */
 	public HeapFoldingState(
-			KeyedStateBackend<K> backend,
 			FoldingStateDescriptor<T, ACC> stateDesc,
 			StateTable<K, N, ACC> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
-		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
-		this.foldFunction = stateDesc.getFoldFunction();
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.foldTransformation = new FoldTransformation<>(stateDesc);
 	}
 
 	// ------------------------------------------------------------------------
@@ -69,62 +66,37 @@ public class HeapFoldingState<K, N, T, ACC>
 
 	@Override
 	public ACC get() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, ACC>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return null;
-		}
-
-		return keyedMap.get(backend.<K>getCurrentKey());
+		return stateTable.get(currentNamespace);
 	}
 
 	@Override
 	public void add(T value) throws IOException {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
 
 		if (value == null) {
 			clear();
 			return;
 		}
 
-		Map<N, Map<K, ACC>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			namespaceMap = createNewMap();
-			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
+		try {
+			stateTable.transform(currentNamespace, value, foldTransformation);
+		} catch (Exception e) {
+			throw new IOException("Could not add value to folding state.", e);
 		}
+	}
 
-		Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			keyedMap = createNewMap();
-			namespaceMap.put(currentNamespace, keyedMap);
-		}
+	static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
 
-		ACC currentValue = keyedMap.get(backend.<K>getCurrentKey());
+		private final FoldingStateDescriptor<T, ACC> stateDescriptor;
+		private final FoldFunction<T, ACC> foldFunction;
 
-		try {
+		FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
+			this.stateDescriptor = Preconditions.checkNotNull(stateDesc);
+			this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction());
+		}
 
-			if (currentValue == null) {
-				keyedMap.put(backend.<K>getCurrentKey(),
-						foldFunction.fold(stateDesc.getDefaultValue(), value));
-			} else {
-				keyedMap.put(backend.<K>getCurrentKey(), foldFunction.fold(currentValue, value));
-			}
-		} catch (Exception e) {
-			throw new RuntimeException("Could not add value to folding state.", e);
+		@Override
+		public ACC apply(ACC previousState, T value) throws Exception {
+			return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a4a08c1..0335933 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -29,18 +30,15 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
-import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -54,8 +52,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -74,6 +70,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to
@@ -94,7 +91,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * but we can't put them here because different key/value states with different types and
 	 * namespace types share this central list of tables.
 	 */
-	private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+	private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+
+	/**
+	 * Determines whether or not we run snapshots asynchronously. This impacts the choice of the underlying
+	 * {@link StateTable} implementation.
+	 */
+	private final boolean asynchronousSnapshots;
 
 	public HeapKeyedStateBackend(
 			TaskKvStateRegistry kvStateRegistry,
@@ -102,10 +105,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ClassLoader userCodeClassLoader,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
+			boolean asynchronousSnapshots,
 			ExecutionConfig executionConfig) {
 
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
-
+		this.asynchronousSnapshots = asynchronousSnapshots;
 		LOG.info("Initializing heap keyed state backend with stream factory.");
 	}
 
@@ -124,7 +128,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
 			String stateName,
 			StateDescriptor.Type stateType,
-			TypeSerializer<N> namespaceSerializer, 
+			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<V> valueSerializer) {
 
 		final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
@@ -134,7 +138,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
 
 		if (stateTable == null) {
-			stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
+			stateTable = newStateTable(newMetaInfo);
 			stateTables.put(stateName, stateTable);
 		} else {
 			if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
@@ -152,7 +156,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ValueStateDescriptor<V> stateDesc) throws Exception {
 
 		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@Override
@@ -170,7 +174,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				namespaceSerializer,
 				new ArrayListSerializer<T>(stateDesc.getElementSerializer()));
 
-		return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@Override
@@ -179,7 +183,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ReducingStateDescriptor<T> stateDesc) throws Exception {
 
 		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapReducingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapReducingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@Override
@@ -188,7 +192,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
 
 		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapAggregatingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapAggregatingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@Override
@@ -197,83 +201,151 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
 		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-		return new HeapFoldingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		return new HeapFoldingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@Override
 	public <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-		
+
 		StateTable<K, N, HashMap<UK, UV>> stateTable = tryRegisterStateTable(
 				stateDesc.getName(),
 				stateDesc.getType(),
 				namespaceSerializer,
 				new HashMapSerializer<>(stateDesc.getKeySerializer(), stateDesc.getValueSerializer()));
-		
-		return new HeapMapState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+
+		return new HeapMapState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public RunnableFuture<KeyGroupsStateHandle> snapshot(
-			long checkpointId,
-			long timestamp,
-			CheckpointStreamFactory streamFactory,
+	public  RunnableFuture<KeyGroupsStateHandle> snapshot(
+			final long checkpointId,
+			final long timestamp,
+			final CheckpointStreamFactory streamFactory,
 			CheckpointOptions checkpointOptions) throws Exception {
 
 		if (stateTables.isEmpty()) {
 			return new DoneFuture<>(null);
 		}
 
-		try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.
-				createCheckpointStateOutputStream(checkpointId, timestamp)) {
-
-			DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+		long syncStartTime = System.currentTimeMillis();
 
-			Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
-					"Too many KV-States: " + stateTables.size() +
-							". Currently at most " + Short.MAX_VALUE + " states are supported");
+		Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
+				"Too many KV-States: " + stateTables.size() +
+						". Currently at most " + Short.MAX_VALUE + " states are supported");
 
-			List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
+		List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
 
-			Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
+		final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
 
-			for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+		final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size());
 
-				RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
-				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
-						metaInfo.getStateType(),
-						metaInfo.getName(),
-						metaInfo.getNamespaceSerializer(),
-						metaInfo.getStateSerializer());
+		for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+			RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
+			KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+					metaInfo.getStateType(),
+					metaInfo.getName(),
+					metaInfo.getNamespaceSerializer(),
+					metaInfo.getStateSerializer());
 
-				metaInfoProxyList.add(metaInfoProxy);
-				kVStateToId.put(kvState.getKey(), kVStateToId.size());
+			metaInfoProxyList.add(metaInfoProxy);
+			kVStateToId.put(kvState.getKey(), kVStateToId.size());
+			StateTable<K, ?, ?> stateTable = kvState.getValue();
+			if (null != stateTable) {
+				cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot());
 			}
+		}
 
-			KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+		final KeyedBackendSerializationProxy serializationProxy =
+				new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+		//--------------------------------------------------- this becomes the end of sync part
+
+		// implementation of the async IO operation, based on FutureTask
+		final AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+				new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+					AtomicBoolean open = new AtomicBoolean(false);
+
+					@Override
+					public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+						if (open.compareAndSet(false, true)) {
+							CheckpointStreamFactory.CheckpointStateOutputStream stream =
+									streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+							try {
+								cancelStreamRegistry.registerClosable(stream);
+								return stream;
+							} catch (Exception ex) {
+								open.set(false);
+								throw ex;
+							}
+						} else {
+							throw new IOException("Operation already opened.");
+						}
+					}
 
-			serializationProxy.write(outView);
+					@Override
+					public KeyGroupsStateHandle performOperation() throws Exception {
+						long asyncStartTime = System.currentTimeMillis();
+						CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+						DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+						serializationProxy.write(outView);
 
-			int offsetCounter = 0;
-			long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
+						long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
 
-			for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
-				keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
-				outView.writeInt(keyGroupIndex);
-				for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
-					outView.writeShort(kVStateToId.get(kvState.getKey()));
-					writeStateTableForKeyGroup(outView, kvState.getValue(), keyGroupIndex);
-				}
-			}
+						for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+							int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
+							keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+							outView.writeInt(keyGroupId);
+
+							for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+								outView.writeShort(kVStateToId.get(kvState.getKey()));
+								cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId);
+							}
+						}
+
+						if (open.compareAndSet(true, false)) {
+							StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+							KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+							final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+
+							if (asynchronousSnapshots) {
+								LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
+										streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+							}
+
+							return keyGroupsStateHandle;
+						} else {
+							throw new IOException("Checkpoint stream already closed.");
+						}
+					}
+
+					@Override
+					public void done(boolean canceled) {
+						if (open.compareAndSet(true, false)) {
+							CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+							if (null != stream) {
+								cancelStreamRegistry.unregisterClosable(stream);
+								IOUtils.closeQuietly(stream);
+							}
+						}
+						for (StateTableSnapshot snapshot : cowStateStableSnapshots.values()) {
+							snapshot.release();
+						}
+					}
+				};
 
-			StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+		AsyncStoppableTaskWithCallback<KeyGroupsStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
 
-			KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
-			final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
-			return new DoneFuture<>(keyGroupsStateHandle);
+		if (!asynchronousSnapshots) {
+			task.run();
 		}
+
+		LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " +
+				Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
+
+		return task;
 	}
 
 	@SuppressWarnings("deprecation")
@@ -292,42 +364,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 	}
 
-	private <N, S> void writeStateTableForKeyGroup(
-			DataOutputView outView,
-			StateTable<K, N, S> stateTable,
-			int keyGroupIndex) throws IOException {
-
-		TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
-		TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
-
-		Map<N, Map<K, S>> namespaceMap = stateTable.get(keyGroupIndex);
-		if (namespaceMap == null) {
-			outView.writeByte(0);
-		} else {
-			outView.writeByte(1);
-
-			// number of namespaces
-			outView.writeInt(namespaceMap.size());
-			for (Map.Entry<N, Map<K, S>> namespace : namespaceMap.entrySet()) {
-				namespaceSerializer.serialize(namespace.getKey(), outView);
-
-				Map<K, S> entryMap = namespace.getValue();
-
-				// number of entries
-				outView.writeInt(entryMap.size());
-				for (Map.Entry<K, S> entry : entryMap.entrySet()) {
-					keySerializer.serialize(entry.getKey(), outView);
-					stateSerializer.serialize(entry.getValue(), outView);
-				}
-			}
-		}
-	}
-
 	@SuppressWarnings({"unchecked"})
 	private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) throws Exception {
 
+		final Map<Integer, String> kvStatesById = new HashMap<>();
 		int numRegisteredKvStates = 0;
-		Map<Integer, String> kvStatesById = new HashMap<>();
+		stateTables.clear();
 
 		for (KeyGroupsStateHandle keyGroupsHandle : state) {
 
@@ -359,7 +401,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
 								new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
 
-						stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
+						stateTable = newStateTable(registeredBackendStateMetaInfo);
 						stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
 						kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
 						++numRegisteredKvStates;
@@ -372,20 +414,20 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					fsDataInputStream.seek(offset);
 
 					int writtenKeyGroupIndex = inView.readInt();
-					assert writtenKeyGroupIndex == keyGroupIndex;
+
+					Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+							"Unexpected key-group in restore.");
 
 					for (int i = 0; i < metaInfoList.size(); i++) {
 						int kvStateId = inView.readShort();
-
-						byte isPresent = inView.readByte();
-						if (isPresent == 0) {
-							continue;
-						}
-
 						StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
-						Preconditions.checkNotNull(stateTable);
 
-						readStateTableForKeyGroup(inView, stateTable, keyGroupIndex);
+						StateTableByKeyGroupReader keyGroupReader =
+								StateTableByKeyGroupReaders.readerForVersion(
+										stateTable,
+										serializationProxy.getRestoredVersion());
+
+						keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
 					}
 				}
 			} finally {
@@ -395,38 +437,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 	}
 
-	private <N, S> void readStateTableForKeyGroup(
-			DataInputView inView,
-			StateTable<K, N, S> stateTable,
-			int keyGroupIndex) throws IOException {
-
-		TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
-		TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
-
-		Map<N, Map<K, S>> namespaceMap = new HashMap<>();
-		stateTable.set(keyGroupIndex, namespaceMap);
-
-		int numNamespaces = inView.readInt();
-		for (int k = 0; k < numNamespaces; k++) {
-			N namespace = namespaceSerializer.deserialize(inView);
-			Map<K, S> entryMap = new HashMap<>();
-			namespaceMap.put(namespace, entryMap);
-
-			int numEntries = inView.readInt();
-			for (int l = 0; l < numEntries; l++) {
-				K key = keySerializer.deserialize(inView);
-				S state = stateSerializer.deserialize(inView);
-				entryMap.put(key, state);
-			}
-		}
-	}
-
 	@Override
 	public String toString() {
 		return "HeapKeyedStateBackend";
 	}
 
-	@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
+	@SuppressWarnings({"unchecked", "rawtypes", "DeprecatedIsStillUsed"})
 	@Deprecated
 	private void restoreOldSavepointKeyedState(
 			Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException {
@@ -444,118 +460,18 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) {
 
-			KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue();
-
-			final RestoredState restoredState;
-
-			if (genericSnapshot instanceof AbstractMemStateSnapshot) {
-
-				AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
-						(AbstractMemStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
+			final String stateName = nameToState.getKey();
+			final KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue();
 
-				restoredState = restoreHeapState(stateSnapshot);
-
-			} else if (genericSnapshot instanceof AbstractFsStateSnapshot) {
-
-				AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
-						(AbstractFsStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
-				restoredState = restoreFsState(stateSnapshot);
+			if (genericSnapshot instanceof MigrationRestoreSnapshot) {
+				MigrationRestoreSnapshot<K, ?, ?> stateSnapshot = (MigrationRestoreSnapshot<K, ?, ?>) genericSnapshot;
+				final StateTable rawResultMap =
+						stateSnapshot.deserialize(stateName, this);
+				// add named state to the backend
+				stateTables.put(stateName, rawResultMap);
 			} else {
 				throw new IllegalStateException("Unknown state: " + genericSnapshot);
 			}
-
-			Map rawResultMap = restoredState.getRawResultMap();
-			TypeSerializer<?> namespaceSerializer = restoredState.getNamespaceSerializer();
-			TypeSerializer<?> stateSerializer = restoredState.getStateSerializer();
-
-			if (namespaceSerializer instanceof VoidSerializer) {
-				namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			}
-
-			Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
-
-			if (null != nullNameSpaceFix) {
-				rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
-			}
-
-			RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
-					new RegisteredBackendStateMetaInfo<>(
-							StateDescriptor.Type.UNKNOWN,
-							nameToState.getKey(),
-							namespaceSerializer,
-							stateSerializer);
-
-			StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
-			stateTable.getState()[0] = rawResultMap;
-
-			// add named state to the backend
-			stateTables.put(registeredBackendStateMetaInfo.getName(), stateTable);
-		}
-	}
-
-	@SuppressWarnings("deprecation")
-	private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
-		return new RestoredState(
-				stateSnapshot.deserialize(),
-				stateSnapshot.getNamespaceSerializer(),
-				stateSnapshot.getStateSerializer());
-	}
-
-	@SuppressWarnings({"rawtypes", "unchecked", "deprecation"})
-	private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
-		FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
-		//TODO register closeable to support fast cancelation?
-		try (FSDataInputStream inStream = fs.open(stateSnapshot.getFilePath())) {
-
-			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
-
-			final int numNamespaces = inView.readInt();
-			HashMap rawResultMap = new HashMap<>(numNamespaces);
-
-			TypeSerializer<K> keySerializer = stateSnapshot.getKeySerializer();
-			TypeSerializer<?> namespaceSerializer = stateSnapshot.getNamespaceSerializer();
-			TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
-
-			for (int i = 0; i < numNamespaces; i++) {
-				Object namespace = namespaceSerializer.deserialize(inView);
-				final int numKV = inView.readInt();
-				Map<K, Object> namespaceMap = new HashMap<>(numKV);
-				rawResultMap.put(namespace, namespaceMap);
-				for (int j = 0; j < numKV; j++) {
-					K key = keySerializer.deserialize(inView);
-					Object value = stateSerializer.deserialize(inView);
-					namespaceMap.put(key, value);
-				}
-			}
-			return new RestoredState(rawResultMap, namespaceSerializer, stateSerializer);
-		} catch (Exception e) {
-			throw new IOException("Failed to restore state from file system", e);
-		}
-	}
-
-	@SuppressWarnings("rawtypes")
-	static final class RestoredState {
-
-		private final Map rawResultMap;
-		private final TypeSerializer<?> namespaceSerializer;
-		private final TypeSerializer<?> stateSerializer ;
-
-		public RestoredState(Map rawResultMap, TypeSerializer<?> namespaceSerializer, TypeSerializer<?> stateSerializer) {
-			this.rawResultMap = rawResultMap;
-			this.namespaceSerializer = namespaceSerializer;
-			this.stateSerializer = stateSerializer;
-		}
-
-		public Map getRawResultMap() {
-			return rawResultMap;
-		}
-
-		public TypeSerializer<?> getNamespaceSerializer() {
-			return namespaceSerializer;
-		}
-
-		public TypeSerializer<?> getStateSerializer() {
-			return stateSerializer;
 		}
 	}
 
@@ -567,15 +483,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public int numStateEntries() {
 		int sum = 0;
 		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
-			for (Map namespaceMap : stateTable.getState()) {
-				if (namespaceMap == null) {
-					continue;
-				}
-				Map<?, Map> typedMap = (Map<?, Map>) namespaceMap;
-				for (Map entriesMap : typedMap.values()) {
-					sum += entriesMap.size();
-				}
-			}
+			sum += stateTable.size();
 		}
 		return sum;
 	}
@@ -584,22 +492,22 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Returns the total number of state entries across all keys for the given namespace.
 	 */
 	@VisibleForTesting
-	@SuppressWarnings("unchecked")
-	public <N> int numStateEntries(N namespace) {
+	public int numStateEntries(Object namespace) {
 		int sum = 0;
 		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
-			for (Map namespaceMap : stateTable.getState()) {
-				if (namespaceMap == null) {
-					continue;
-				}
-				Map<?, Map> typedMap = (Map<?, Map>) namespaceMap;
-				Map singleNamespace = typedMap.get(namespace);
-				if (singleNamespace != null) {
-					sum += singleNamespace.size();
-				}
-			}
+			sum += stateTable.sizeOfNamespace(namespace);
 		}
 		return sum;
 	}
 
-}
+	public <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+		return asynchronousSnapshots ?
+				new CopyOnWriteStateTable<>(this, newMetaInfo) :
+				new NestedMapsStateTable<>(this, newMetaInfo);
+	}
+
+	@Override
+	public boolean supportsAsynchronousSnapshots() {
+		return asynchronousSnapshots;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 02c3067..d3f67f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -22,14 +22,11 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
-import java.util.Map;
 
 /**
  * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
@@ -46,18 +43,16 @@ public class HeapListState<K, N, V>
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
 	 *
-	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
 	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
 	 */
 	public HeapListState(
-			KeyedStateBackend<K> backend,
 			ListStateDescriptor<V> stateDesc,
 			StateTable<K, N, ArrayList<V>> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
-		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -66,55 +61,24 @@ public class HeapListState<K, N, V>
 
 	@Override
 	public Iterable<V> get() {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
-		Map<N, Map<K, ArrayList<V>>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return null;
-		}
-
-		return keyedMap.get(backend.<K>getCurrentKey());
+		return stateTable.get(currentNamespace);
 	}
 
 	@Override
 	public void add(V value) {
-		Preconditions.checkState(currentNamespace != null, "No namespace set.");
-		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
+		final N namespace = currentNamespace;
 
 		if (value == null) {
 			clear();
 			return;
 		}
 
-		Map<N, Map<K, ArrayList<V>>> namespaceMap =
-				stateTable.get(backend.getCurrentKeyGroupIndex());
-
-		if (namespaceMap == null) {
-			namespaceMap = createNewMap();
-			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
-		}
-
-		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			keyedMap = createNewMap();
-			namespaceMap.put(currentNamespace, keyedMap);
-		}
-
-		ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
+		final StateTable<K, N, ArrayList<V>> map = stateTable;
+		ArrayList<V> list = map.get(namespace);
 
 		if (list == null) {
 			list = new ArrayList<>();
-			keyedMap.put(backend.<K>getCurrentKey(), list);
+			map.put(namespace, list);
 		}
 		list.add(value);
 	}
@@ -124,20 +88,7 @@ public class HeapListState<K, N, V>
 		Preconditions.checkState(namespace != null, "No namespace given.");
 		Preconditions.checkState(key != null, "No key given.");
 
-		Map<N, Map<K, ArrayList<V>>> namespaceMap =
-				stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
-		if (namespaceMap == null) {
-			return null;
-		}
-
-		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
-		if (keyedMap == null) {
-			return null;
-		}
-
-		ArrayList<V> result = keyedMap.get(key);
+		ArrayList<V> result = stateTable.get(key, namespace);
 
 		if (result == null) {
 			return null;