You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/24 17:51:44 UTC

[2/3] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend (backport from 1.3)

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTable.java
new file mode 100644
index 0000000..6c9c14c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTable.java
@@ -0,0 +1,1066 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.TreeSet;
+
+/**
+ * Implementation of Flink's in-memory state tables with copy-on-write support. This map does not support null values
+ * for key or namespace.
+ * <p>
+ * {@link CopyOnWriteStateTable} sacrifices some peak performance and memory efficiency for features like incremental
+ * rehashing and asynchronous snapshots through copy-on-write. Copy-on-write tries to minimize the amount of copying by
+ * maintaining version meta data for both, the map structure and the state objects. However, we must often proactively
+ * copy state objects when we hand them to the user.
+ * <p>
+ * As for any state backend, user should not keep references on state objects that they obtained from state backends
+ * outside the scope of the user function calls.
+ * <p>
+ * Some brief maintenance notes:
+ * <p>
+ * 1) Flattening the underlying data structure from nested maps (namespace) -> (key) -> (state) to one flat map
+ * (key, namespace) -> (state) brings certain performance trade-offs. In theory, the flat map has one less level of
+ * indirection compared to the nested map. However, the nested map naturally de-duplicates namespace objects for which
+ * #equals() is true. This leads to potentially a lot of redundant namespace objects for the flattened version. Those,
+ * in turn, can again introduce more cache misses because we need to follow the namespace object on all operations to
+ * ensure entry identities. Obviously, copy-on-write can also add memory overhead. So does the meta data to track
+ * copy-on-write requirement (state and entry versions on {@link StateTableEntry}).
+ * <p>
+ * 2) A flat map structure is a lot easier when it comes to tracking copy-on-write of the map structure.
+ * <p>
+ * 3) Nested structure had the (never used) advantage that we can easily drop and iterate whole namespaces. This could
+ * give locality advantages for certain access pattern, e.g. iterating a namespace.
+ * <p>
+ * 4) Serialization format is changed from namespace-prefix compressed (as naturally provided from the old nested
+ * structure) to making all entries self contained as (key, namespace, state).
+ * <p>
+ * 5) We got rid of having multiple nested tables, one for each key-group. Instead, we partition state into key-groups
+ * on-the-fly, during the asynchronous part of a snapshot.
+ * <p>
+ * 6) Currently, a state table can only grow, but never shrinks on low load. We could easily add this if required.
+ * <p>
+ * 7) Heap based state backends like this can easily cause a lot of GC activity. Besides using G1 as garbage collector,
+ * we should provide an additional state backend that operates on off-heap memory. This would sacrifice peak performance
+ * (due to de/serialization of objects) for a lower, but more constant throughput and potentially huge simplifications
+ * w.r.t. copy-on-write.
+ * <p>
+ * 8) We could try a hybrid of a serialized and object based backends, where key and namespace of the entries are both
+ * serialized in one byte-array.
+ * <p>
+ * 9) We could consider smaller types (e.g. short) for the version counting and think about some reset strategy before
+ * overflows, when there is no snapshot running. However, this would have to touch all entries in the map.
+ * <p>
+ * This class was initially based on the {@link java.util.HashMap} implementation of the Android JDK, but is now heavily
+ * customized towards the use case of table for state entries.
+ *
+ * IMPORTANT: the contracts for this class rely on the user not holding any references to objects returned by this map
+ * beyond the life cycle of per-element operations. Or phrased differently, all get-update-put operations on a mapping
+ * should be within one call of processElement. Otherwise, the user must take care of taking deep copies, e.g. for
+ * caching purposes.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of value.
+ */
+public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implements Iterable<StateEntry<K, N, S>> {
+
+	/**
+	 * The logger.
+	 */
+	private static final Logger LOG = LoggerFactory.getLogger(AsyncHeapKeyedStateBackend.class);
+
+	/**
+	 * Min capacity (other than zero) for a {@link CopyOnWriteStateTable}. Must be a power of two
+	 * greater than 1 (and less than 1 << 30).
+	 */
+	private static final int MINIMUM_CAPACITY = 4;
+
+	/**
+	 * Max capacity for a {@link CopyOnWriteStateTable}. Must be a power of two >= MINIMUM_CAPACITY.
+	 */
+	private static final int MAXIMUM_CAPACITY = 1 << 30;
+
+	/**
+	 * Minimum number of entries that one step of incremental rehashing migrates from the old to the new sub-table.
+	 */
+	private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
+
+	/**
+	 * An empty table shared by all zero-capacity maps (typically from default
+	 * constructor). It is never written to, and replaced on first put. Its size
+	 * is set to half the minimum, so that the first resize will create a
+	 * minimum-sized table.
+	 */
+	private static final StateTableEntry<?, ?, ?>[] EMPTY_TABLE = new StateTableEntry[MINIMUM_CAPACITY >>> 1];
+
+	/**
+	 * Empty entry that we use to bootstrap our StateEntryIterator.
+	 */
+	private static final StateTableEntry<?, ?, ?> ITERATOR_BOOTSTRAP_ENTRY = new StateTableEntry<>();
+
+	/**
+	 * Maintains an ordered set of version ids that are still in use by unreleased snapshots.
+	 */
+	private final TreeSet<Integer> snapshotVersions;
+
+	/**
+	 * This is the primary entry array (hash directory) of the state table. If no incremental rehash is ongoing, this
+	 * is the only used table.
+	 **/
+	private StateTableEntry<K, N, S>[] primaryTable;
+
+	/**
+	 * We maintain a secondary entry array while performing an incremental rehash. The purpose is to slowly migrate
+	 * entries from the primary table to this resized table array. When all entries are migrated, this becomes the new
+	 * primary table.
+	 */
+	private StateTableEntry<K, N, S>[] incrementalRehashTable;
+
+	/**
+	 * The current number of mappings in the primary table.
+	 */
+	private int primaryTableSize;
+
+	/**
+	 * The current number of mappings in the rehash table.
+	 */
+	private int incrementalRehashTableSize;
+
+	/**
+	 * The next index for a step of incremental rehashing in the primary table.
+	 */
+	private int rehashIndex;
+
+	/**
+	 * The current version of this map. Used for copy-on-write mechanics.
+	 */
+	private int stateTableVersion;
+
+	/**
+	 * The highest version of this map that is still required by any unreleased snapshot.
+	 */
+	private int highestRequiredSnapshotVersion;
+
+	/**
+	 * The last namespace that was actually inserted. This is a small optimization to reduce duplicate namespace objects.
+	 */
+	private N lastNamespace;
+
+	/**
+	 * The {@link CopyOnWriteStateTable} is rehashed when its size exceeds this threshold.
+	 * The value of this field is generally .75 * capacity, except when
+	 * the capacity is zero, as described in the EMPTY_TABLE declaration
+	 * above.
+	 */
+	private int threshold;
+
+	/**
+	 * Incremented by "structural modifications" to allow (best effort)
+	 * detection of concurrent modification.
+	 */
+	private int modCount;
+
+	/**
+	 * Constructs a new {@code StateTable} with default capacity of 1024.
+	 *
+	 * @param keyContext the key context.
+	 * @param metaInfo   the meta information, including the type serializer for state copy-on-write.
+	 */
+	CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+		this(keyContext, metaInfo, 1024);
+	}
+
+	/**
+	 * Constructs a new {@code StateTable} instance with the specified capacity.
+	 *
+	 * @param keyContext the key context.
+	 * @param metaInfo   the meta information, including the type serializer for state copy-on-write.
+	 * @param capacity   the initial capacity of this hash map.
+	 * @throws IllegalArgumentException when the capacity is less than zero.
+	 */
+	@SuppressWarnings("unchecked")
+	private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+		super(keyContext, metaInfo);
+
+		// initialized tables to EMPTY_TABLE.
+		this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+		this.incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+
+		// initialize sizes to 0.
+		this.primaryTableSize = 0;
+		this.incrementalRehashTableSize = 0;
+
+		this.rehashIndex = 0;
+		this.stateTableVersion = 0;
+		this.highestRequiredSnapshotVersion = 0;
+		this.snapshotVersions = new TreeSet<>();
+
+		if (capacity < 0) {
+			throw new IllegalArgumentException("Capacity: " + capacity);
+		}
+
+		if (capacity == 0) {
+			threshold = -1;
+			return;
+		}
+
+		if (capacity < MINIMUM_CAPACITY) {
+			capacity = MINIMUM_CAPACITY;
+		} else if (capacity > MAXIMUM_CAPACITY) {
+			capacity = MAXIMUM_CAPACITY;
+		} else {
+			capacity = MathUtils.roundUpToPowerOfTwo(capacity);
+		}
+		primaryTable = makeTable(capacity);
+	}
+
+	// Public API from AbstractStateTable ------------------------------------------------------------------------------
+
+	/**
+	 * Returns the total number of entries in this {@link CopyOnWriteStateTable}. This is the sum of both sub-tables.
+	 *
+	 * @return the number of entries in this {@link CopyOnWriteStateTable}.
+	 */
+	@Override
+	public int size() {
+		return primaryTableSize + incrementalRehashTableSize;
+	}
+
+	@Override
+	public S get(K key, N namespace) {
+
+		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+		final int requiredVersion = highestRequiredSnapshotVersion;
+		final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+		int index = hash & (tab.length - 1);
+
+		for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+			final K eKey = e.key;
+			final N eNamespace = e.namespace;
+			if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
+
+				// copy-on-write check for state
+				if (e.stateVersion < requiredVersion) {
+					// copy-on-write check for entry
+					if (e.entryVersion < requiredVersion) {
+						e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
+					}
+					e.stateVersion = stateTableVersion;
+					e.state = getStateSerializer().copy(e.state);
+				}
+
+				return e.state;
+			}
+		}
+
+		return null;
+	}
+
+	@Override
+	public void put(K key, int keyGroup, N namespace, S state) {
+		put(key, namespace, state);
+	}
+
+	@Override
+	public S get(N namespace) {
+		return get(keyContext.getCurrentKey(), namespace);
+	}
+
+	@Override
+	public boolean containsKey(N namespace) {
+		return containsKey(keyContext.getCurrentKey(), namespace);
+	}
+
+	@Override
+	public void put(N namespace, S state) {
+		put(keyContext.getCurrentKey(), namespace, state);
+	}
+
+	@Override
+	public S putAndGetOld(N namespace, S state) {
+		return putAndGetOld(keyContext.getCurrentKey(), namespace, state);
+	}
+
+	@Override
+	public void remove(N namespace) {
+		remove(keyContext.getCurrentKey(), namespace);
+	}
+
+	@Override
+	public S removeAndGetOld(N namespace) {
+		return removeAndGetOld(keyContext.getCurrentKey(), namespace);
+	}
+
+	@Override
+	public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
+		transform(keyContext.getCurrentKey(), namespace, value, transformation);
+	}
+
+	// Private implementation details of the API methods ---------------------------------------------------------------
+
+	/**
+	 * Returns whether this table contains the specified key/namespace composite key.
+	 *
+	 * @param key       the key in the composite key to search for. Not null.
+	 * @param namespace the namespace in the composite key to search for. Not null.
+	 * @return {@code true} if this map contains the specified key/namespace composite key,
+	 * {@code false} otherwise.
+	 */
+	boolean containsKey(K key, N namespace) {
+
+		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+		final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+		int index = hash & (tab.length - 1);
+
+		for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+			final K eKey = e.key;
+			final N eNamespace = e.namespace;
+
+			if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * Maps the specified key/namespace composite key to the specified value. This method should be preferred
+	 * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)} when the caller is not interested
+	 * in the old value, because this can potentially reduce copy-on-write activity.
+	 *
+	 * @param key       the key. Not null.
+	 * @param namespace the namespace. Not null.
+	 * @param value     the value. Can be null.
+	 */
+	void put(K key, N namespace, S value) {
+		final StateTableEntry<K, N, S> e = putEntry(key, namespace);
+
+		e.state = value;
+		e.stateVersion = stateTableVersion;
+	}
+
+	/**
+	 * Maps the specified key/namespace composite key to the specified value. Returns the previous state that was
+	 * registered under the composite key.
+	 *
+	 * @param key       the key. Not null.
+	 * @param namespace the namespace. Not null.
+	 * @param value     the value. Can be null.
+	 * @return the value of any previous mapping with the specified key or
+	 * {@code null} if there was no such mapping.
+	 */
+	S putAndGetOld(K key, N namespace, S value) {
+
+		final StateTableEntry<K, N, S> e = putEntry(key, namespace);
+
+		// copy-on-write check for state
+		S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ?
+				getStateSerializer().copy(e.state) :
+				e.state;
+
+		e.state = value;
+		e.stateVersion = stateTableVersion;
+
+		return oldState;
+	}
+
+	/**
+	 * Removes the mapping with the specified key/namespace composite key from this map. This method should be preferred
+	 * over {@link #removeAndGetOld(Object, Object)} when the caller is not interested in the old value, because this
+	 * can potentially reduce copy-on-write activity.
+	 *
+	 * @param key       the key of the mapping to remove. Not null.
+	 * @param namespace the namespace of the mapping to remove. Not null.
+	 */
+	void remove(K key, N namespace) {
+		removeEntry(key, namespace);
+	}
+
+	/**
+	 * Removes the mapping with the specified key/namespace composite key from this map, returning the state that was
+	 * found under the entry.
+	 *
+	 * @param key       the key of the mapping to remove. Not null.
+	 * @param namespace the namespace of the mapping to remove. Not null.
+	 * @return the value of the removed mapping or {@code null} if no mapping
+	 * for the specified key was found.
+	 */
+	S removeAndGetOld(K key, N namespace) {
+
+		final StateTableEntry<K, N, S> e = removeEntry(key, namespace);
+
+		return e != null ?
+				// copy-on-write check for state
+				(e.stateVersion < highestRequiredSnapshotVersion ?
+						getStateSerializer().copy(e.state) :
+						e.state) :
+				null;
+	}
+
+	/**
+	 * @param key            the key of the mapping to remove. Not null.
+	 * @param namespace      the namespace of the mapping to remove. Not null.
+	 * @param value          the value that is the second input for the transformation.
+	 * @param transformation the transformation function to apply on the old state and the given value.
+	 * @param <T>            type of the value that is the second input to the {@link StateTransformationFunction}.
+	 * @throws Exception exception that happen on applying the function.
+	 * @see #transform(Object, Object, StateTransformationFunction).
+	 */
+	<T> void transform(
+			K key,
+			N namespace,
+			T value,
+			StateTransformationFunction<S, T> transformation) throws Exception {
+
+		final StateTableEntry<K, N, S> entry = putEntry(key, namespace);
+
+		// copy-on-write check for state
+		entry.state = transformation.apply(
+				(entry.stateVersion < highestRequiredSnapshotVersion) ?
+						getStateSerializer().copy(entry.state) :
+						entry.state,
+				value);
+		entry.stateVersion = stateTableVersion;
+	}
+
+	/**
+	 * Helper method that is the basis for operations that add mappings.
+	 */
+	private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
+
+		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+		final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+		int index = hash & (tab.length - 1);
+
+		for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+			if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
+
+				// copy-on-write check for entry
+				if (e.entryVersion < highestRequiredSnapshotVersion) {
+					e = handleChainedEntryCopyOnWrite(tab, index, e);
+				}
+
+				return e;
+			}
+		}
+
+		++modCount;
+		if (size() > threshold) {
+			doubleCapacity();
+		}
+
+		return addNewStateTableEntry(tab, key, namespace, hash);
+	}
+
+	/**
+	 * Helper method that is the basis for operations that remove mappings.
+	 */
+	private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
+
+		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+		final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+		int index = hash & (tab.length - 1);
+
+		for (StateTableEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {
+			if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
+				if (prev == null) {
+					tab[index] = e.next;
+				} else {
+					// copy-on-write check for entry
+					if (prev.entryVersion < highestRequiredSnapshotVersion) {
+						prev = handleChainedEntryCopyOnWrite(tab, index, prev);
+					}
+					prev.next = e.next;
+				}
+				++modCount;
+				if (tab == primaryTable) {
+					--primaryTableSize;
+				} else {
+					--incrementalRehashTableSize;
+				}
+				return e;
+			}
+		}
+		return null;
+	}
+
+	private void checkKeyNamespacePreconditions(K key, N namespace) {
+		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
+		Preconditions.checkNotNull(namespace, "Provided namespace is null.");
+	}
+
+	// Meta data setter / getter and toString --------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializer<S> getStateSerializer() {
+		return metaInfo.getStateSerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return metaInfo.getNamespaceSerializer();
+	}
+
+	@Override
+	public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+		return metaInfo;
+	}
+
+	@Override
+	public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+		this.metaInfo = metaInfo;
+	}
+
+	// Iteration  ------------------------------------------------------------------------------------------------------
+
+	@Override
+	public Iterator<StateEntry<K, N, S>> iterator() {
+		return new StateEntryIterator();
+	}
+
+	// Private utility functions for StateTable management -------------------------------------------------------------
+
+	/**
+	 * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot)
+	 */
+	@VisibleForTesting
+	void releaseSnapshot(int snapshotVersion) {
+		// we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release.
+		// Only stale reads of from the result of #releaseSnapshot calls are ok.
+		synchronized (snapshotVersions) {
+			Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to release unknown snapshot version");
+			highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ? 0 : snapshotVersions.last();
+		}
+	}
+
+	/**
+	 * Creates (combined) copy of the table arrays for a snapshot. This method must be called by the same Thread that
+	 * does modifications to the {@link CopyOnWriteStateTable}.
+	 */
+	@VisibleForTesting
+	@SuppressWarnings("unchecked")
+	StateTableEntry<K, N, S>[] snapshotTableArrays() {
+
+		// we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release.
+		// Only stale reads of from the result of #releaseSnapshot calls are ok. This is why we must call this method
+		// from the same thread that does all the modifications to the table.
+		synchronized (snapshotVersions) {
+
+			// increase the table version for copy-on-write and register the snapshot
+			if (++stateTableVersion < 0) {
+				// this is just a safety net against overflows, but should never happen in practice (i.e., only after 2^31 snapshots)
+				throw new IllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart.");
+			}
+
+			highestRequiredSnapshotVersion = stateTableVersion;
+			snapshotVersions.add(highestRequiredSnapshotVersion);
+		}
+
+		StateTableEntry<K, N, S>[] table = primaryTable;
+		if (isRehashing()) {
+			// consider both tables for the snapshot, the rehash index tells us which part of the two tables we need
+			final int localRehashIndex = rehashIndex;
+			final int localCopyLength = table.length - localRehashIndex;
+			StateTableEntry<K, N, S>[] copy = new StateTableEntry[localRehashIndex + table.length];
+			// for the primary table, take every index >= rhIdx.
+			System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
+
+			// for the new table, we are sure that two regions contain all the entries:
+			// [0, rhIdx[ AND [table.length / 2, table.length / 2 + rhIdx[
+			table = incrementalRehashTable;
+			System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
+			System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
+
+			return copy;
+		} else {
+			// we only need to copy the primary table
+			return Arrays.copyOf(table, table.length);
+		}
+	}
+
+	/**
+	 * Allocate a table of the given capacity and set the threshold accordingly.
+	 *
+	 * @param newCapacity must be a power of two
+	 */
+	private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
+
+		if (MAXIMUM_CAPACITY == newCapacity) {
+			LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can lead " +
+					"to more collisions and lower performance. Please consider scaling-out your job or using a " +
+					"different keyed state backend implementation!");
+		}
+
+		threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity
+		@SuppressWarnings("unchecked") StateTableEntry<K, N, S>[] newTable
+				= (StateTableEntry<K, N, S>[]) new StateTableEntry[newCapacity];
+		return newTable;
+	}
+
+	/**
+	 * Creates and inserts a new {@link StateTableEntry}.
+	 */
+	private StateTableEntry<K, N, S> addNewStateTableEntry(
+			StateTableEntry<K, N, S>[] table,
+			K key,
+			N namespace,
+			int hash) {
+
+		// small optimization that aims to avoid holding references on duplicate namespace objects
+		if (namespace.equals(lastNamespace)) {
+			namespace = lastNamespace;
+		} else {
+			lastNamespace = namespace;
+		}
+
+		int index = hash & (table.length - 1);
+		StateTableEntry<K, N, S> newEntry = new StateTableEntry<>(
+				key,
+				namespace,
+				null,
+				hash,
+				table[index],
+				stateTableVersion,
+				stateTableVersion);
+		table[index] = newEntry;
+
+		if (table == primaryTable) {
+			++primaryTableSize;
+		} else {
+			++incrementalRehashTableSize;
+		}
+		return newEntry;
+	}
+
+	/**
+	 * Select the sub-table which is responsible for entries with the given hash code.
+	 *
+	 * @param hashCode the hash code which we use to decide about the table that is responsible.
+	 * @return the index of the sub-table that is responsible for the entry with the given hash code.
+	 */
+	private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) {
+		return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? primaryTable : incrementalRehashTable;
+	}
+
+	/**
+	 * Doubles the capacity of the hash table. Existing entries are placed in
+	 * the correct bucket on the enlarged table. If the current capacity is,
+	 * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which
+	 * will be new unless we were already at MAXIMUM_CAPACITY.
+	 */
+	private void doubleCapacity() {
+
+		// There can only be one rehash in flight. From the amount of incremental rehash steps we take, this should always hold.
+		Preconditions.checkState(!isRehashing(), "There is already a rehash in progress.");
+
+		StateTableEntry<K, N, S>[] oldTable = primaryTable;
+
+		int oldCapacity = oldTable.length;
+
+		if (oldCapacity == MAXIMUM_CAPACITY) {
+			return;
+		}
+
+		incrementalRehashTable = makeTable(oldCapacity * 2);
+	}
+
+	/**
+	 * Returns true, if an incremental rehash is in progress.
+	 */
+	@VisibleForTesting
+	boolean isRehashing() {
+		// if we rehash, the secondary table is not empty
+		return EMPTY_TABLE != incrementalRehashTable;
+	}
+
+	/**
+	 * Computes the hash for the composite of key and namespace and performs some steps of incremental rehash if
+	 * incremental rehashing is in progress.
+	 */
+	private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {
+
+		checkKeyNamespacePreconditions(key, namespace);
+
+		if (isRehashing()) {
+			incrementalRehash();
+		}
+
+		return compositeHash(key, namespace);
+	}
+
+	/**
+	 * Runs a number of steps for incremental rehashing.
+	 */
+	@SuppressWarnings("unchecked")
+	private void incrementalRehash() {
+
+		StateTableEntry<K, N, S>[] oldTable = primaryTable;
+		StateTableEntry<K, N, S>[] newTable = incrementalRehashTable;
+
+		int oldCapacity = oldTable.length;
+		int newMask = newTable.length - 1;
+		int requiredVersion = highestRequiredSnapshotVersion;
+		int rhIdx = rehashIndex;
+		int transferred = 0;
+
+		// we migrate a certain minimum amount of entries from the old to the new table
+		while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
+
+			StateTableEntry<K, N, S> e = oldTable[rhIdx];
+
+			while (e != null) {
+				// copy-on-write check for entry
+				if (e.entryVersion < requiredVersion) {
+					e = new StateTableEntry<>(e, stateTableVersion);
+				}
+				StateTableEntry<K, N, S> n = e.next;
+				int pos = e.hash & newMask;
+				e.next = newTable[pos];
+				newTable[pos] = e;
+				e = n;
+				++transferred;
+			}
+
+			oldTable[rhIdx] = null;
+			if (++rhIdx == oldCapacity) {
+				//here, the rehash is complete and we release resources and reset fields
+				primaryTable = newTable;
+				incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+				primaryTableSize += incrementalRehashTableSize;
+				incrementalRehashTableSize = 0;
+				rehashIndex = 0;
+				return;
+			}
+		}
+
+		// sync our local bookkeeping the with official bookkeeping fields
+		primaryTableSize -= transferred;
+		incrementalRehashTableSize += transferred;
+		rehashIndex = rhIdx;
+	}
+
+	/**
+	 * Perform copy-on-write for entry chains. We iterate the (hopefully and probably) still cached chain, replace
+	 * all links up to the 'untilEntry', which we actually wanted to modify.
+	 */
+	private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(
+			StateTableEntry<K, N, S>[] tab,
+			int tableIdx,
+			StateTableEntry<K, N, S> untilEntry) {
+
+		final int required = highestRequiredSnapshotVersion;
+
+		StateTableEntry<K, N, S> current = tab[tableIdx];
+		StateTableEntry<K, N, S> copy;
+
+		if (current.entryVersion < required) {
+			copy = new StateTableEntry<>(current, stateTableVersion);
+			tab[tableIdx] = copy;
+		} else {
+			// nothing to do, just advance copy to current
+			copy = current;
+		}
+
+		// we iterate the chain up to 'until entry'
+		while (current != untilEntry) {
+
+			//advance current
+			current = current.next;
+
+			if (current.entryVersion < required) {
+				// copy and advance the current's copy
+				copy.next = new StateTableEntry<>(current, stateTableVersion);
+				copy = copy.next;
+			} else {
+				// nothing to do, just advance copy to current
+				copy = current;
+			}
+		}
+
+		return copy;
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
+		return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY;
+	}
+
+	/**
+	 * Helper function that creates and scrambles a composite hash for key and namespace.
+	 */
+	private static int compositeHash(Object key, Object namespace) {
+		// create composite key through XOR, then apply some bit-mixing for better distribution of skewed keys.
+		return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode());
+	}
+
+	// Snapshotting ----------------------------------------------------------------------------------------------------
+
+	int getStateTableVersion() {
+		return stateTableVersion;
+	}
+
+	/**
+	 * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be written in checkpointing. The snapshot integrity
+	 * is protected through copy-on-write from the {@link CopyOnWriteStateTable}. Users should call
+	 * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using the returned object.
+	 *
+	 * @return a snapshot from this {@link CopyOnWriteStateTable}, for checkpointing.
+	 */
+	@Override
+	public CopyOnWriteStateTableSnapshot<K, N, S> createSnapshot() {
+		return new CopyOnWriteStateTableSnapshot<>(this);
+	}
+
+	/**
+	 * Releases a snapshot for this {@link CopyOnWriteStateTable}. This method should be called once a snapshot is no more needed,
+	 * so that the {@link CopyOnWriteStateTable} can stop considering this snapshot for copy-on-write, thus avoiding unnecessary
+	 * object creation.
+	 *
+	 * @param snapshotToRelease the snapshot to release, which was previously created by this state table.
+	 */
+	void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> snapshotToRelease) {
+
+		Preconditions.checkArgument(snapshotToRelease.isOwner(this),
+				"Cannot release snapshot which is owned by a different state table.");
+
+		releaseSnapshot(snapshotToRelease.getSnapshotVersion());
+	}
+
+	// StateTableEntry -------------------------------------------------------------------------------------------------
+
+	/**
+	 * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of key, namespace, and state. Thereby, key and
+	 * namespace together serve as a composite key for the state. This class also contains some management meta data for
+	 * copy-on-write, a pointer to link other {@link StateTableEntry}s to a list, and cached hash code.
+	 *
+	 * @param <K> type of key.
+	 * @param <N> type of namespace.
+	 * @param <S> type of state.
+	 */
+	static class StateTableEntry<K, N, S> implements StateEntry<K, N, S> {
+
+		/**
+		 * The key. Assumed to be immutable and not null.
+		 */
+		final K key;
+
+		/**
+		 * The namespace. Assumed to be immutable and not null.
+		 */
+		final N namespace;
+
+		/**
+		 * The state. This is not final to allow exchanging the object for copy-on-write. Can be null.
+		 */
+		S state;
+
+		/**
+		 * Link to another {@link StateTableEntry}. This is used to resolve collisions in the
+		 * {@link CopyOnWriteStateTable} through chaining.
+		 */
+		StateTableEntry<K, N, S> next;
+
+		/**
+		 * The version of this {@link StateTableEntry}. This is meta data for copy-on-write of the table structure.
+		 */
+		int entryVersion;
+
+		/**
+		 * The version of the state object in this entry. This is meta data for copy-on-write of the state object itself.
+		 */
+		int stateVersion;
+
+		/**
+		 * The computed secondary hash for the composite of key and namespace.
+		 */
+		final int hash;
+
+		StateTableEntry() {
+			this(null, null, null, 0, null, 0, 0);
+		}
+
+		StateTableEntry(StateTableEntry<K, N, S> other, int entryVersion) {
+			this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion);
+		}
+
+		StateTableEntry(
+				K key,
+				N namespace,
+				S state,
+				int hash,
+				StateTableEntry<K, N, S> next,
+				int entryVersion,
+				int stateVersion) {
+			this.key = key;
+			this.namespace = namespace;
+			this.hash = hash;
+			this.next = next;
+			this.entryVersion = entryVersion;
+			this.state = state;
+			this.stateVersion = stateVersion;
+		}
+
+		public final void setState(S value, int mapVersion) {
+			// naturally, we can update the state version every time we replace the old state with a different object
+			if (value != state) {
+				this.state = value;
+				this.stateVersion = mapVersion;
+			}
+		}
+
+		@Override
+		public K getKey() {
+			return key;
+		}
+
+		@Override
+		public N getNamespace() {
+			return namespace;
+		}
+
+		@Override
+		public S getState() {
+			return state;
+		}
+
+		@Override
+		public final boolean equals(Object o) {
+			if (!(o instanceof CopyOnWriteStateTable.StateTableEntry)) {
+				return false;
+			}
+
+			StateEntry<?, ?, ?> e = (StateEntry<?, ?, ?>) o;
+			return e.getKey().equals(key)
+					&& e.getNamespace().equals(namespace)
+					&& Objects.equals(e.getState(), state);
+		}
+
+		@Override
+		public final int hashCode() {
+			return (key.hashCode() ^ namespace.hashCode()) ^ Objects.hashCode(state);
+		}
+
+		@Override
+		public final String toString() {
+			return "(" + key + "|" + namespace + ")=" + state;
+		}
+	}
+
+	// For testing  ----------------------------------------------------------------------------------------------------
+
+	@Override
+	public int sizeOfNamespace(Object namespace) {
+		int count = 0;
+		for (StateEntry<K, N, S> entry : this) {
+			if (null != entry && namespace.equals(entry.getNamespace())) {
+				++count;
+			}
+		}
+		return count;
+	}
+
+
+	// StateEntryIterator  ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Iterator over the entries in a {@link CopyOnWriteStateTable}.
+	 */
+	class StateEntryIterator implements Iterator<StateEntry<K, N, S>> {
+		private StateTableEntry<K, N, S>[] activeTable;
+		private int nextTablePosition;
+		private StateTableEntry<K, N, S> nextEntry;
+		private int expectedModCount = modCount;
+
+		StateEntryIterator() {
+			this.activeTable = primaryTable;
+			this.nextTablePosition = 0;
+			this.expectedModCount = modCount;
+			this.nextEntry = getBootstrapEntry();
+			advanceIterator();
+		}
+
+		private StateTableEntry<K, N, S> advanceIterator() {
+
+			StateTableEntry<K, N, S> entryToReturn = nextEntry;
+			StateTableEntry<K, N, S> next = entryToReturn.next;
+
+			// consider both sub-tables tables to cover the case of rehash
+			while (next == null) {
+
+				StateTableEntry<K, N, S>[] tab = activeTable;
+
+				while (nextTablePosition < tab.length) {
+					next = tab[nextTablePosition++];
+
+					if (next != null) {
+						nextEntry = next;
+						return entryToReturn;
+					}
+				}
+
+				if (activeTable == incrementalRehashTable) {
+					break;
+				}
+
+				activeTable = incrementalRehashTable;
+				nextTablePosition = 0;
+			}
+
+			nextEntry = next;
+			return entryToReturn;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return nextEntry != null;
+		}
+
+		@Override
+		public StateTableEntry<K, N, S> next() {
+			if (modCount != expectedModCount) {
+				throw new ConcurrentModificationException();
+			}
+
+			if (nextEntry == null) {
+				throw new NoSuchElementException();
+			}
+
+			return advanceIterator();
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Read-only iterator");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java
new file mode 100644
index 0000000..db3b197
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import java.io.IOException;
+
+/**
+ * This class represents the snapshot of a {@link CopyOnWriteStateTable} and has a role in operator state checkpointing. Besides
+ * holding the {@link CopyOnWriteStateTable}s internal entries at the time of the snapshot, this class is also responsible for
+ * preparing and writing the state in the process of checkpointing.
+ * <p>
+ * IMPORTANT: Please notice that snapshot integrity of entries in this class rely on proper copy-on-write semantics
+ * through the {@link CopyOnWriteStateTable} that created the snapshot object, but all objects in this snapshot must be considered
+ * as READ-ONLY!. The reason is that the objects held by this class may or may not be deep copies of original objects
+ * that may still used in the {@link CopyOnWriteStateTable}. This depends for each entry on whether or not it was subject to
+ * copy-on-write operations by the {@link CopyOnWriteStateTable}. Phrased differently: the {@link CopyOnWriteStateTable} provides
+ * copy-on-write isolation for this snapshot, but this snapshot does not isolate modifications from the
+ * {@link CopyOnWriteStateTable}!
+ *
+ * @param <K> type of key
+ * @param <N> type of namespace
+ * @param <S> type of state
+ */
+@Internal
+public class CopyOnWriteStateTableSnapshot<K, N, S>
+		extends AbstractStateTableSnapshot<K, N, S, CopyOnWriteStateTable<K, N, S>> {
+
+	/**
+	 * Version of the {@link CopyOnWriteStateTable} when this snapshot was created. This can be used to release the snapshot.
+	 */
+	private final int snapshotVersion;
+
+	/**
+	 * The number of entries in the {@link CopyOnWriteStateTable} at the time of creating this snapshot.
+	 */
+	private final int stateTableSize;
+
+	/**
+	 * The state table entries, as by the time this snapshot was created. Objects in this array may or may not be deep
+	 * copies of the current entries in the {@link CopyOnWriteStateTable} that created this snapshot. This depends for each entry
+	 * on whether or not it was subject to copy-on-write operations by the {@link CopyOnWriteStateTable}.
+	 */
+	private final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshotData;
+
+	/**
+	 * Offsets for the individual key-groups. This is lazily created when the snapshot is grouped by key-group during
+	 * the process of writing this snapshot to an output as part of checkpointing.
+	 */
+	private int[] keyGroupOffsets;
+
+	/**
+	 * Creates a new {@link CopyOnWriteStateTableSnapshot}.
+	 *
+	 * @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents a snapshot.
+	 */
+	CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {
+
+		super(owningStateTable);
+		this.snapshotData = owningStateTable.snapshotTableArrays();
+		this.snapshotVersion = owningStateTable.getStateTableVersion();
+		this.stateTableSize = owningStateTable.size();
+		this.keyGroupOffsets = null;
+	}
+
+	/**
+	 * Returns the internal version of the {@link CopyOnWriteStateTable} when this snapshot was created. This value must be used to
+	 * tell the {@link CopyOnWriteStateTable} when to release this snapshot.
+	 */
+	int getSnapshotVersion() {
+		return snapshotVersion;
+	}
+
+	/**
+	 * Partitions the snapshot data by key-group. The algorithm first builds a histogram for the distribution of keys
+	 * into key-groups. Then, the histogram is accumulated to obtain the boundaries of each key-group in an array.
+	 * Last, we use the accumulated counts as write position pointers for the key-group's bins when reordering the
+	 * entries by key-group. This operation is lazily performed before the first writing of a key-group.
+	 * <p>
+	 * As a possible future optimization, we could perform the repartitioning in-place, using a scheme similar to the
+	 * cuckoo cycles in cuckoo hashing. This can trade some performance for a smaller memory footprint.
+	 */
+	@SuppressWarnings("unchecked")
+	private void partitionEntriesByKeyGroup() {
+
+		// We only have to perform this step once before the first key-group is written
+		if (null != keyGroupOffsets) {
+			return;
+		}
+
+		final KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange();
+		final int totalKeyGroups = owningStateTable.keyContext.getNumberOfKeyGroups();
+		final int baseKgIdx = keyGroupRange.getStartKeyGroup();
+		final int[] histogram = new int[keyGroupRange.getNumberOfKeyGroups() + 1];
+
+		CopyOnWriteStateTable.StateTableEntry<K, N, S>[] unfold = new CopyOnWriteStateTable.StateTableEntry[stateTableSize];
+
+		// 1) In this step we i) 'unfold' the linked list of entries to a flat array and ii) build a histogram for key-groups
+		int unfoldIndex = 0;
+		for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : snapshotData) {
+			while (null != entry) {
+				int effectiveKgIdx =
+						KeyGroupRangeAssignment.computeKeyGroupForKeyHash(entry.key.hashCode(), totalKeyGroups) - baseKgIdx + 1;
+				++histogram[effectiveKgIdx];
+				unfold[unfoldIndex++] = entry;
+				entry = entry.next;
+			}
+		}
+
+		// 2) We accumulate the histogram bins to obtain key-group ranges in the final array
+		for (int i = 1; i < histogram.length; ++i) {
+			histogram[i] += histogram[i - 1];
+		}
+
+		// 3) We repartition the entries by key-group, using the histogram values as write indexes
+		for (CopyOnWriteStateTable.StateTableEntry<K, N, S> t : unfold) {
+			int effectiveKgIdx =
+					KeyGroupRangeAssignment.computeKeyGroupForKeyHash(t.key.hashCode(), totalKeyGroups) - baseKgIdx;
+			snapshotData[histogram[effectiveKgIdx]++] = t;
+		}
+
+		// 4) As byproduct, we also created the key-group offsets
+		this.keyGroupOffsets = histogram;
+	}
+
+	@Override
+	public void release() {
+		owningStateTable.releaseSnapshot(this);
+	}
+
+	@Override
+	public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
+
+		if (null == keyGroupOffsets) {
+			partitionEntriesByKeyGroup();
+		}
+
+		final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] groupedOut = snapshotData;
+		KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange();
+		int keyGroupOffsetIdx = keyGroupId - keyGroupRange.getStartKeyGroup() - 1;
+		int startOffset = keyGroupOffsetIdx < 0 ? 0 : keyGroupOffsets[keyGroupOffsetIdx];
+		int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1];
+
+		TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();
+		TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer();
+		TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer();
+
+		// write number of mappings in key-group
+		dov.writeInt(endOffset - startOffset);
+
+		// write mappings
+		for (int i = startOffset; i < endOffset; ++i) {
+			CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite = groupedOut[i];
+			groupedOut[i] = null; // free asap for GC
+			namespaceSerializer.serialize(toWrite.namespace, dov);
+			keySerializer.serialize(toWrite.key, dov);
+			stateSerializer.serialize(toWrite.state, dov);
+		}
+	}
+
+	/**
+	 * Returns true iff the given state table is the owner of this snapshot object.
+	 */
+	boolean isOwner(CopyOnWriteStateTable<K, N, S> stateTable) {
+		return stateTable == owningStateTable;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapFoldingState.java
new file mode 100644
index 0000000..ad955c3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapFoldingState.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Heap-backed partitioned {@link FoldingState} that is
+ * snapshotted into files.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ */
+public class HeapFoldingState<K, N, T, ACC>
+	extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+	implements FoldingState<T, ACC> {
+
+	/** The function used to fold the state */
+	private final FoldTransformation<T, ACC> foldTransformation;
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 *
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 */
+	public HeapFoldingState(
+			FoldingStateDescriptor<T, ACC> stateDesc,
+			StateTable<K, N, ACC> stateTable,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) {
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.foldTransformation = new FoldTransformation<>(stateDesc);
+	}
+
+	// ------------------------------------------------------------------------
+	//  state access
+	// ------------------------------------------------------------------------
+
+	public ACC get() {
+		return stateTable.get(currentNamespace);
+	}
+
+	public void add(T value) throws IOException {
+
+		if (value == null) {
+			clear();
+			return;
+		}
+
+		try {
+			stateTable.transform(currentNamespace, value, foldTransformation);
+		} catch (Exception e) {
+			throw new IOException("Could not add value to folding state.", e);
+		}
+	}
+
+	static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
+
+		private final FoldingStateDescriptor<T, ACC> stateDescriptor;
+		private final FoldFunction<T, ACC> foldFunction;
+
+		FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
+			this.stateDescriptor = Preconditions.checkNotNull(stateDesc);
+			this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction());
+		}
+
+		@Override
+		public ACC apply(ACC previousState, T value) throws Exception {
+			return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapListState.java
new file mode 100644
index 0000000..ab5fff5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapListState.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+
+/**
+ * Heap-backed partitioned {@link ListState} that is snapshotted
+ * into files.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class HeapListState<K, N, V>
+	extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+	implements ListState<V> {
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 *
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 */
+	public HeapListState(
+			ListStateDescriptor<V> stateDesc,
+			StateTable<K, N, ArrayList<V>> stateTable,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) {
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+	}
+
+	// ------------------------------------------------------------------------
+	//  state access
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Iterable<V> get() {
+		return stateTable.get(currentNamespace);
+	}
+
+	@Override
+	public void add(V value) {
+		final N namespace = currentNamespace;
+
+		if (value == null) {
+			clear();
+			return;
+		}
+
+		final StateTable<K, N, ArrayList<V>> map = stateTable;
+		ArrayList<V> list = map.get(namespace);
+
+		if (list == null) {
+			list = new ArrayList<>();
+			map.put(namespace, list);
+		}
+		list.add(value);
+	}
+
+	@Override
+	public byte[] getSerializedValue(K key, N namespace) throws Exception {
+		Preconditions.checkState(namespace != null, "No namespace given.");
+		Preconditions.checkState(key != null, "No key given.");
+
+		ArrayList<V> result = stateTable.get(key, namespace);
+
+		if (result == null) {
+			return null;
+		}
+
+		TypeSerializer<V> serializer = stateDesc.getSerializer();
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
+
+		// write the same as RocksDB writes lists, with one ',' separator
+		for (int i = 0; i < result.size(); i++) {
+			serializer.serialize(result.get(i), view);
+			if (i < result.size() -1) {
+				view.writeByte(',');
+			}
+		}
+		view.flush();
+
+		return baos.toByteArray();
+	}
+
+	// ------------------------------------------------------------------------
+	//  state merging
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
+		a.addAll(b);
+		return a;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapReducingState.java
new file mode 100644
index 0000000..b6eed74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapReducingState.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Heap-backed partitioned {@link ReducingState} that is
+ * snapshotted into files.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class HeapReducingState<K, N, V>
+	extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
+	implements ReducingState<V> {
+
+	private final ReduceTransformation<V> reduceTransformation;
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 *
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param stateTable The state table to use in this kev/value state. May contain initial state.
+	 */
+	public HeapReducingState(
+			ReducingStateDescriptor<V> stateDesc,
+			StateTable<K, N, V> stateTable,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) {
+
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction());
+	}
+
+	// ------------------------------------------------------------------------
+	//  state access
+	// ------------------------------------------------------------------------
+
+	@Override
+	public V get() {
+		return stateTable.get(currentNamespace);
+	}
+
+	@Override
+	public void add(V value) throws IOException {
+
+		if (value == null) {
+			clear();
+			return;
+		}
+
+		try {
+			stateTable.transform(currentNamespace, value, reduceTransformation);
+		} catch (Exception e) {
+			throw new IOException("Exception while applying ReduceFunction in reducing state", e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  state merging
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected V mergeState(V a, V b) throws Exception {
+		return reduceTransformation.apply(a, b);
+	}
+
+	static final class ReduceTransformation<V> implements StateTransformationFunction<V, V> {
+
+		private final ReduceFunction<V> reduceFunction;
+
+		ReduceTransformation(ReduceFunction<V> reduceFunction) {
+			this.reduceFunction = Preconditions.checkNotNull(reduceFunction);
+		}
+
+		@Override
+		public V apply(V previousState, V value) throws Exception {
+			return previousState != null ? reduceFunction.reduce(previousState, value) : value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapValueState.java
new file mode 100644
index 0000000..436c20e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapValueState.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Heap-backed partitioned {@link ValueState} that is snapshotted
+ * into files.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class HeapValueState<K, N, V>
+	extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
+	implements ValueState<V> {
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 *
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 */
+	public HeapValueState(
+			ValueStateDescriptor<V> stateDesc,
+			StateTable<K, N, V> stateTable,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) {
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+	}
+
+	@Override
+	public V value() {
+		final V result = stateTable.get(currentNamespace);
+
+		if (result == null) {
+			return stateDesc.getDefaultValue();
+		}
+
+		return result;
+	}
+
+	@Override
+	public void update(V value) {
+
+		if (value == null) {
+			clear();
+			return;
+		}
+
+		stateTable.put(currentNamespace, value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/InternalKeyContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/InternalKeyContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/InternalKeyContext.java
new file mode 100644
index 0000000..bf988ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/InternalKeyContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+/**
+ * This interface is the current context of a keyed state. It provides information about the currently selected key in
+ * the context, the corresponding key-group, and other key and key-grouping related information.
+ * <p>
+ * The typical use case for this interface is providing a view on the current-key selection aspects of
+ * {@link org.apache.flink.runtime.state.KeyedStateBackend}.
+ */
+@Internal
+public interface InternalKeyContext<K> {
+
+	/**
+	 * Used by states to access the current key.
+	 */
+	K getCurrentKey();
+
+	/**
+	 * Returns the key-group to which the current key belongs.
+	 */
+	int getCurrentKeyGroupIndex();
+
+	/**
+	 * Returns the number of key-groups aka max parallelism.
+	 */
+	int getNumberOfKeyGroups();
+
+	/**
+	 * Returns the key groups for this backend.
+	 */
+	KeyGroupRange getKeyGroupRange();
+
+	/**
+	 * {@link TypeSerializer} for the state backend key type.
+	 */
+	TypeSerializer<K> getKeySerializer();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateEntry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateEntry.java
new file mode 100644
index 0000000..d32e825
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateEntry.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+/**
+ * Interface of entries in a state table. Entries are triple of key, namespace, and state.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+public interface StateEntry<K, N, S> {
+
+	/**
+	 * Returns the key of this entry.
+	 */
+	K getKey();
+
+	/**
+	 * Returns the namespace of this entry.
+	 */
+	N getNamespace();
+
+	/**
+	 * Returns the state of this entry.
+	 */
+	S getState();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTable.java
new file mode 100644
index 0000000..c1db7e8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTable.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for state tables. Accesses to state are typically scoped by the currently active key, as provided
+ * through the {@link InternalKeyContext}.
+ *
+ * @param <K> type of key
+ * @param <N> type of namespace
+ * @param <S> type of state
+ */
+public abstract class StateTable<K, N, S> {
+
+	/**
+	 * The key context view on the backend. This provides information, such as the currently active key.
+	 */
+	protected final InternalKeyContext<K> keyContext;
+
+	/**
+	 * Combined meta information such as name and serializers for this state
+	 */
+	protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
+
+	/**
+	 *
+	 * @param keyContext the key context provides the key scope for all put/get/delete operations.
+	 * @param metaInfo the meta information, including the type serializer for state copy-on-write.
+	 */
+	public StateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+		this.keyContext = Preconditions.checkNotNull(keyContext);
+		this.metaInfo = Preconditions.checkNotNull(metaInfo);
+	}
+
+	// Main interface methods of StateTable -------------------------------------------------------
+
+	/**
+	 * Returns whether this {@link NestedMapsStateTable} is empty.
+	 *
+	 * @return {@code true} if this {@link NestedMapsStateTable} has no elements, {@code false}
+	 * otherwise.
+	 * @see #size()
+	 */
+	public boolean isEmpty() {
+		return size() == 0;
+	}
+
+	/**
+	 * Returns the total number of entries in this {@link NestedMapsStateTable}. This is the sum of both sub-tables.
+	 *
+	 * @return the number of entries in this {@link NestedMapsStateTable}.
+	 */
+	public abstract int size();
+
+	/**
+	 * Returns the state of the mapping for the composite of active key and given namespace.
+	 *
+	 * @param namespace the namespace. Not null.
+	 * @return the states of the mapping with the specified key/namespace composite key, or {@code null}
+	 * if no mapping for the specified key is found.
+	 */
+	public abstract S get(N namespace);
+
+	/**
+	 * Returns whether this table contains a mapping for the composite of active key and given namespace.
+	 *
+	 * @param namespace the namespace in the composite key to search for. Not null.
+	 * @return {@code true} if this map contains the specified key/namespace composite key,
+	 * {@code false} otherwise.
+	 */
+	public abstract boolean containsKey(N namespace);
+
+	/**
+	 * Maps the composite of active key and given namespace to the specified state. This method should be preferred
+	 * over {@link #putAndGetOld(N, S)} (Namespace, State)} when the caller is not interested in the old state.
+	 *
+	 * @param namespace the namespace. Not null.
+	 * @param state     the state. Can be null.
+	 */
+	public abstract void put(N namespace, S state);
+
+	/**
+	 * Maps the composite of active key and given namespace to the specified state. Returns the previous state that
+	 * was registered under the composite key.
+	 *
+	 * @param namespace the namespace. Not null.
+	 * @param state     the state. Can be null.
+	 * @return the state of any previous mapping with the specified key or
+	 * {@code null} if there was no such mapping.
+	 */
+	public abstract S putAndGetOld(N namespace, S state);
+
+	/**
+	 * Removes the mapping for the composite of active key and given namespace. This method should be preferred
+	 * over {@link #removeAndGetOld(N)} when the caller is not interested in the old state.
+	 *
+	 * @param namespace the namespace of the mapping to remove. Not null.
+	 */
+	public abstract void remove(N namespace);
+
+	/**
+	 * Removes the mapping for the composite of active key and given namespace, returning the state that was
+	 * found under the entry.
+	 *
+	 * @param namespace the namespace of the mapping to remove. Not null.
+	 * @return the state of the removed mapping or {@code null} if no mapping
+	 * for the specified key was found.
+	 */
+	public abstract S removeAndGetOld(N namespace);
+
+	/**
+	 * Applies the given {@link StateTransformationFunction} to the state (1st input argument), using the given value as
+	 * second input argument. The result of {@link StateTransformationFunction#apply(Object, Object)} is then stored as
+	 * the new state. This function is basically an optimization for get-update-put pattern.
+	 *
+	 * @param namespace      the namespace. Not null.
+	 * @param value          the value to use in transforming the state. Can be null.
+	 * @param transformation the transformation function.
+	 * @throws Exception if some exception happens in the transformation function.
+	 */
+	public abstract <T> void transform(
+			N namespace,
+			T value,
+			StateTransformationFunction<S, T> transformation) throws Exception;
+
+	// For queryable state ------------------------------------------------------------------------
+
+	/**
+	 * Returns the state for the composite of active key and given namespace. This is typically used by
+	 * queryable state.
+	 *
+	 * @param key       the key. Not null.
+	 * @param namespace the namespace. Not null.
+	 * @return the state of the mapping with the specified key/namespace composite key, or {@code null}
+	 * if no mapping for the specified key is found.
+	 */
+	public abstract S get(K key, N namespace);
+
+	// Meta data setter / getter and toString -----------------------------------------------------
+
+	public TypeSerializer<S> getStateSerializer() {
+		return metaInfo.getStateSerializer();
+	}
+
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return metaInfo.getNamespaceSerializer();
+	}
+
+	public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+		return metaInfo;
+	}
+
+	public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+		this.metaInfo = metaInfo;
+	}
+
+	// Snapshot / Restore -------------------------------------------------------------------------
+
+	abstract StateTableSnapshot createSnapshot();
+
+	public abstract void put(K key, int keyGroup, N namespace, S state);
+
+	// For testing --------------------------------------------------------------------------------
+
+	@VisibleForTesting
+	public abstract int sizeOfNamespace(Object namespace);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReader.java
new file mode 100644
index 0000000..41f0abd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReader.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * Interface for state de-serialization into {@link org.apache.flink.runtime.state.heap.StateTable}s by key-group.
+ */
+interface StateTableByKeyGroupReader {
+
+	/**
+	 * Read the data for the specified key-group from the input.
+	 *
+	 * @param div        the input
+	 * @param keyGroupId the key-group to write
+	 * @throws IOException on write related problems
+	 */
+	void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReaders.java
new file mode 100644
index 0000000..2b5f15a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReaders.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * This class provides a static factory method to create different implementations of {@link StateTableByKeyGroupReader}
+ * depending on the provided serialization format version.
+ * <p>
+ * The implementations are also located here as inner classes.
+ */
+class StateTableByKeyGroupReaders {
+
+	/**
+	 * Creates a new StateTableByKeyGroupReader that inserts de-serialized mappings into the given table, using the
+	 * de-serialization algorithm that matches the given version.
+	 *
+	 * @param table the {@link org.apache.flink.runtime.state.heap.StateTable} into which de-serialized mappings are inserted.
+	 * @param version version for the de-serialization algorithm.
+	 * @param <K> type of key.
+	 * @param <N> type of namespace.
+	 * @param <S> type of state.
+	 * @return the appropriate reader.
+	 */
+	static <K, N, S> StateTableByKeyGroupReader readerForVersion(StateTable<K, N, S> table, int version) {
+		switch (version) {
+			case 1:
+				return new StateTableByKeyGroupReaderV1<>(table);
+			case 2:
+				return new StateTableByKeyGroupReaderV2<>(table);
+			default:
+				throw new IllegalArgumentException("Unknown version: " + version);
+		}
+	}
+
+	static abstract class AbstractStateTableByKeyGroupReader<K, N, S>
+			implements StateTableByKeyGroupReader {
+
+		protected final StateTable<K, N, S> stateTable;
+
+		AbstractStateTableByKeyGroupReader(StateTable<K, N, S> stateTable) {
+			this.stateTable = stateTable;
+		}
+
+		@Override
+		public abstract void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
+
+		protected TypeSerializer<K> getKeySerializer() {
+			return stateTable.keyContext.getKeySerializer();
+		}
+
+		protected TypeSerializer<N> getNamespaceSerializer() {
+			return stateTable.getNamespaceSerializer();
+		}
+
+		protected TypeSerializer<S> getStateSerializer() {
+			return stateTable.getStateSerializer();
+		}
+	}
+
+	static final class StateTableByKeyGroupReaderV1<K, N, S>
+			extends AbstractStateTableByKeyGroupReader<K, N, S> {
+
+		StateTableByKeyGroupReaderV1(StateTable<K, N, S> stateTable) {
+			super(stateTable);
+		}
+
+		@Override
+		public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
+
+			if (inView.readByte() == 0) {
+				return;
+			}
+
+			final TypeSerializer<K> keySerializer = getKeySerializer();
+			final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
+			final TypeSerializer<S> stateSerializer = getStateSerializer();
+
+			// V1 uses kind of namespace compressing format
+			int numNamespaces = inView.readInt();
+			for (int k = 0; k < numNamespaces; k++) {
+				N namespace = namespaceSerializer.deserialize(inView);
+				int numEntries = inView.readInt();
+				for (int l = 0; l < numEntries; l++) {
+					K key = keySerializer.deserialize(inView);
+					S state = stateSerializer.deserialize(inView);
+					stateTable.put(key, keyGroupId, namespace, state);
+				}
+			}
+		}
+	}
+
+	private static final class StateTableByKeyGroupReaderV2<K, N, S>
+			extends AbstractStateTableByKeyGroupReader<K, N, S> {
+
+		StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) {
+			super(stateTable);
+		}
+
+		@Override
+		public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
+
+			final TypeSerializer<K> keySerializer = getKeySerializer();
+			final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
+			final TypeSerializer<S> stateSerializer = getStateSerializer();
+
+			int numKeys = inView.readInt();
+			for (int i = 0; i < numKeys; ++i) {
+				N namespace = namespaceSerializer.deserialize(inView);
+				K key = keySerializer.deserialize(inView);
+				S state = stateSerializer.deserialize(inView);
+				stateTable.put(key, keyGroupId, namespace, state);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableSnapshot.java
new file mode 100644
index 0000000..184cd59
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableSnapshot.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Interface for the snapshots of a {@link org.apache.flink.runtime.state.heap.StateTable}. Offers a way to serialize the snapshot (by key-group). All
+ * snapshots should be released after usage.
+ */
+interface StateTableSnapshot {
+
+	/**
+	 * Writes the data for the specified key-group to the output.
+	 *
+	 * @param dov the output
+	 * @param keyGroupId the key-group to write
+	 * @throws IOException on write related problems
+	 */
+	void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException;
+
+	/**
+	 * Release the snapshot. All snapshots should be released when they are no longer used because some implementation
+	 * can only release resources after a release.
+	 */
+	void release();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/async/AsyncMemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/async/AsyncMemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/async/AsyncMemoryStateBackend.java
new file mode 100644
index 0000000..54a208a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/async/AsyncMemoryStateBackend.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory.async;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import java.io.IOException;
+
+/**
+ * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no
+ * capabilities to spill to disk. Checkpoints are serialized and the serialized data is
+ * transferred
+ */
+public class AsyncMemoryStateBackend extends AbstractStateBackend {
+
+	private static final long serialVersionUID = 4109305377809414635L;
+
+	/** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
+	private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
+
+	/** The maximal size that the snapshotted memory state may have */
+	private final int maxStateSize;
+
+	/**
+	 * Creates a new memory state backend that accepts states whose serialized forms are
+	 * up to the default state size (5 MB).
+	 */
+	public AsyncMemoryStateBackend() {
+		this(DEFAULT_MAX_STATE_SIZE);
+	}
+
+	/**
+	 * Creates a new memory state backend that accepts states whose serialized forms are
+	 * up to the given number of bytes.
+	 *
+	 * @param maxStateSize The maximal size of the serialized state
+	 */
+	public AsyncMemoryStateBackend(int maxStateSize) {
+		this.maxStateSize = maxStateSize;
+	}
+
+	@Override
+	public String toString() {
+		return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
+	}
+
+	@Override
+	public CheckpointStreamFactory createStreamFactory(
+			JobID jobId, String operatorIdentifier) throws IOException {
+		return new MemCheckpointStreamFactory(maxStateSize);
+	}
+
+	@Override
+	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+			Environment env, JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry) throws IOException {
+
+		return new AsyncHeapKeyedStateBackend<>(
+				kvStateRegistry,
+				keySerializer,
+				env.getUserClassLoader(),
+				numberOfKeyGroups,
+				keyGroupRange);
+	}
+}