You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/24 17:51:45 UTC

[3/3] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend (backport from 1.3)

[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend (backport from 1.3)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6a80725
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6a80725
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6a80725

Branch: refs/heads/release-1.2
Commit: c6a80725053c49dd2064405577291bdc86c82003
Parents: b703a24
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Mar 23 11:36:56 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Mar 24 18:51:19 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/util/MathUtils.java   |   47 +-
 .../state/AbstractKeyedStateBackend.java        |   18 +-
 .../state/StateTransformationFunction.java      |   42 +
 .../filesystem/async/AsyncFsStateBackend.java   |  266 +++++
 .../heap/async/AbstractHeapMergingState.java    |  104 ++
 .../state/heap/async/AbstractHeapState.java     |  119 ++
 .../heap/async/AbstractStateTableSnapshot.java  |   51 +
 .../heap/async/AsyncHeapKeyedStateBackend.java  |  433 +++++++
 .../state/heap/async/CopyOnWriteStateTable.java | 1066 ++++++++++++++++++
 .../async/CopyOnWriteStateTableSnapshot.java    |  188 +++
 .../state/heap/async/HeapFoldingState.java      |   99 ++
 .../runtime/state/heap/async/HeapListState.java |  122 ++
 .../state/heap/async/HeapReducingState.java     |  107 ++
 .../state/heap/async/HeapValueState.java        |   73 ++
 .../state/heap/async/InternalKeyContext.java    |   60 +
 .../runtime/state/heap/async/StateEntry.java    |   44 +
 .../runtime/state/heap/async/StateTable.java    |  189 ++++
 .../heap/async/StateTableByKeyGroupReader.java  |   38 +
 .../heap/async/StateTableByKeyGroupReaders.java |  136 +++
 .../state/heap/async/StateTableSnapshot.java    |   45 +
 .../memory/async/AsyncMemoryStateBackend.java   |   94 ++
 .../state/AsyncFileStateBackendTest.java        |  213 ++++
 .../state/AsyncMemoryStateBackendTest.java      |  197 ++++
 .../runtime/state/MemoryStateBackendTest.java   |    2 +-
 .../runtime/state/StateBackendTestBase.java     |  150 +++
 .../heap/async/CopyOnWriteStateTableTest.java   |  486 ++++++++
 .../state/heap/async/HeapListStateTest.java     |  238 ++++
 .../state/heap/async/HeapReducingStateTest.java |  236 ++++
 .../heap/async/HeapStateBackendTestBase.java    |   37 +
 .../util/BlockerCheckpointStreamFactory.java    |  118 ++
 .../api/windowing/windows/TimeWindow.java       |   49 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   14 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 34 files changed, 5106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 074e8ae..4c52b6e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -22,13 +22,13 @@ package org.apache.flink.util;
  * Collection of simple mathematical routines.
  */
 public final class MathUtils {
-	
+
 	/**
 	 * Computes the logarithm of the given value to the base of 2, rounded down. It corresponds to the
 	 * position of the highest non-zero bit. The position is counted, starting with 0 from the least
 	 * significant bit to the most significant bit. For example, <code>log2floor(16) = 4</code>, and
 	 * <code>log2floor(10) = 3</code>.
-	 * 
+	 *
 	 * @param value The value to compute the logarithm for.
 	 * @return The logarithm (rounded down) to the base of 2.
 	 * @throws ArithmeticException Thrown, if the given value is zero.
@@ -40,11 +40,11 @@ public final class MathUtils {
 
 		return 31 - Integer.numberOfLeadingZeros(value);
 	}
-	
+
 	/**
 	 * Computes the logarithm of the given value to the base of 2. This method throws an error,
 	 * if the given argument is not a power of 2.
-	 * 
+	 *
 	 * @param value The value to compute the logarithm for.
 	 * @return The logarithm to the base of 2.
 	 * @throws ArithmeticException Thrown, if the given value is zero.
@@ -59,25 +59,25 @@ public final class MathUtils {
 		}
 		return 31 - Integer.numberOfLeadingZeros(value);
 	}
-	
+
 	/**
 	 * Decrements the given number down to the closest power of two. If the argument is a
 	 * power of two, it remains unchanged.
-	 * 
+	 *
 	 * @param value The value to round down.
 	 * @return The closest value that is a power of two and less or equal than the given value.
 	 */
 	public static int roundDownToPowerOf2(int value) {
 		return Integer.highestOneBit(value);
 	}
-	
+
 	/**
 	 * Casts the given value to a 32 bit integer, if it can be safely done. If the cast would change the numeric
 	 * value, this method raises an exception.
 	 * <p>
 	 * This method is a protection in places where one expects to be able to safely case, but where unexpected
 	 * situations could make the cast unsafe and would cause hidden problems that are hard to track down.
-	 * 
+	 *
 	 * @param value The value to be cast to an integer.
 	 * @return The given value as an integer.
 	 * @see Math#toIntExact(long)
@@ -172,8 +172,37 @@ public final class MathUtils {
 		return x + 1;
 	}
 
+	/**
+	 * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution.
+	 *
+	 * @param in the long (64-bit)input.
+	 * @return the bit-mixed int (32-bit) output
+	 */
+	public static int longToIntWithBitMixing(long in) {
+		in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
+		in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
+		in = in ^ (in >>> 31);
+		return (int) in;
+	}
+
+	/**
+	 * Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is
+	 * from Murmur's 32 bit finalizer.
+	 *
+	 * @param in the input value
+	 * @return the bit-mixed output value
+	 */
+	public static int bitMix(int in) {
+		in ^= in >>> 16;
+		in *= 0x85ebca6b;
+		in ^= in >>> 13;
+		in *= 0xc2b2ae35;
+		in ^= in >>> 16;
+		return in;
+	}
+
 	// ============================================================================================
-	
+
 	/**
 	 * Prevent Instantiation through private constructor.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 2daf896..23c9a49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -35,6 +35,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.heap.async.AbstractHeapMergingState;
+import org.apache.flink.runtime.state.heap.async.InternalKeyContext;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
@@ -51,7 +53,7 @@ import java.util.List;
  * @param <K> Type of the key by which state is keyed.
  */
 public abstract class AbstractKeyedStateBackend<K>
-		implements KeyedStateBackend<K>, Snapshotable<KeyGroupsStateHandle>, Closeable {
+		implements KeyedStateBackend<K>, Snapshotable<KeyGroupsStateHandle>, Closeable, InternalKeyContext<K> {
 
 	/** {@link TypeSerializer} for our key. */
 	protected final TypeSerializer<K> keySerializer;
@@ -205,6 +207,7 @@ public abstract class AbstractKeyedStateBackend<K>
 	/**
 	 * @see KeyedStateBackend
 	 */
+	@Override
 	public KeyGroupRange getKeyGroupRange() {
 		return keyGroupRange;
 	}
@@ -293,10 +296,16 @@ public abstract class AbstractKeyedStateBackend<K>
 	@Override
 	@SuppressWarnings("unchecked,rawtypes")
 	public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
-		if (stateDescriptor instanceof ReducingStateDescriptor) {
+
+		State stateRef = getPartitionedState(target, namespaceSerializer, stateDescriptor);
+		if (stateRef instanceof AbstractHeapMergingState) {
+
+			((AbstractHeapMergingState) stateRef).mergeNamespaces(target, sources);
+		} else if (stateDescriptor instanceof ReducingStateDescriptor) {
+
 			ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor;
+			ReducingState state = (ReducingState) stateRef;
 			ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
-			ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
 			KvState kvState = (KvState) state;
 			Object result = null;
 			for (N source: sources) {
@@ -314,7 +323,8 @@ public abstract class AbstractKeyedStateBackend<K>
 				state.add(result);
 			}
 		} else if (stateDescriptor instanceof ListStateDescriptor) {
-			ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+
+			ListState<Object> state = (ListState) stateRef;
 			KvState kvState = (KvState) state;
 			List<Object> result = new ArrayList<>();
 			for (N source: sources) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
new file mode 100644
index 0000000..182b4c8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for a binary function that is used for push-down of state transformation into state backends. The
+ * function takes as inputs the old state and an element. From those inputs, the function computes the new state.
+ *
+ * @param <S> type of the previous state that is the bases for the computation of the new state.
+ * @param <T> type of the element value that is used to compute the change of state.
+ */
+@Internal
+public interface StateTransformationFunction<S, T> {
+
+	/**
+	 * Binary function that applies a given value to the given old state to compute the new state.
+	 *
+	 * @param previousState the previous state that is the basis for the transformation.
+	 * @param value         the value that the implementation applies to the old state to obtain the new state.
+	 * @return the new state, computed by applying the given value on the given old state.
+	 * @throws Exception if something goes wrong in applying the transformation function.
+	 */
+	S apply(S previousState, T value) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
new file mode 100644
index 0000000..d90ffbd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem.async;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * The file state backend is a state backend that stores the state of streaming jobs in a file system.
+ *
+ * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
+ * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
+ * files for each state, for example:
+ *
+ * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
+ */
+public class AsyncFsStateBackend extends AbstractStateBackend {
+
+	private static final long serialVersionUID = -8191916350224044011L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AsyncFsStateBackend.class);
+
+	/** By default, state smaller than 1024 bytes will not be written to files, but
+	 * will be stored directly with the metadata */
+	public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
+
+	/** Maximum size of state that is stored with the metadata, rather than in files */
+	private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+	
+	/** The path to the directory for the checkpoint data, including the file system
+	 * description via scheme and optional authority */
+	private final Path basePath;
+
+	/** State below this size will be stored as part of the metadata, rather than in files */
+	private final int fileStateThreshold;
+	
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public AsyncFsStateBackend(String checkpointDataUri) throws IOException {
+		this(new Path(checkpointDataUri));
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public AsyncFsStateBackend(Path checkpointDataUri) throws IOException {
+		this(checkpointDataUri.toUri());
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public AsyncFsStateBackend(URI checkpointDataUri) throws IOException {
+		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
+	 *                             rather than in files
+	 * 
+	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 */
+	public AsyncFsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
+		if (fileStateSizeThreshold < 0) {
+			throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
+		}
+		if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
+			throw new IllegalArgumentException("The threshold for file state size cannot be larger than " +
+				MAX_FILE_STATE_THRESHOLD);
+		}
+		this.fileStateThreshold = fileStateSizeThreshold;
+		
+		this.basePath = validateAndNormalizeUri(checkpointDataUri);
+	}
+
+	/**
+	 * Gets the base directory where all state-containing files are stored.
+	 * The job specific directory is created inside this directory.
+	 *
+	 * @return The base directory.
+	 */
+	public Path getBasePath() {
+		return basePath;
+	}
+
+	// ------------------------------------------------------------------------
+	//  initialization and cleanup
+	// ------------------------------------------------------------------------
+
+	@Override
+	public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
+		return new FsCheckpointStreamFactory(basePath, jobId, fileStateThreshold);
+	}
+
+	@Override
+	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry) throws Exception {
+		return new AsyncHeapKeyedStateBackend<>(
+				kvStateRegistry,
+				keySerializer,
+				env.getUserClassLoader(),
+				numberOfKeyGroups,
+				keyGroupRange);
+	}
+
+	@Override
+	public String toString() {
+		return "File State Backend @ " + basePath;
+	}
+
+	/**
+	 * Checks and normalizes the checkpoint data URI. This method first checks the validity of the
+	 * URI (scheme, path, availability of a matching file system) and then normalizes the URI
+	 * to a path.
+	 * 
+	 * <p>If the URI does not include an authority, but the file system configured for the URI has an
+	 * authority, then the normalized path will include this authority.
+	 * 
+	 * @param checkpointDataUri The URI to check and normalize.
+	 * @return A normalized URI as a Path.
+	 * 
+	 * @throws IllegalArgumentException Thrown, if the URI misses scheme or path. 
+	 * @throws IOException Thrown, if no file system can be found for the URI's scheme.
+	 */
+	public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException {
+		final String scheme = checkpointDataUri.getScheme();
+		final String path = checkpointDataUri.getPath();
+
+		// some validity checks
+		if (scheme == null) {
+			throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+					"Please specify the file system scheme explicitly in the URI.");
+		}
+		if (path == null) {
+			throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
+					"Please specify a directory path for the checkpoint data.");
+		}
+		if (path.length() == 0 || path.equals("/")) {
+			throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
+		}
+
+		if (!FileSystem.isFlinkSupportedScheme(checkpointDataUri.getScheme())) {
+			// skip verification checks for non-flink supported filesystem
+			// this is because the required filesystem classes may not be available to the flink client
+			return new Path(checkpointDataUri);
+		} else {
+			// we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
+			// (distributed) filesystem on all hosts and includes full host/port information, even if the
+			// original URI did not include that. We count on the filesystem loading from the configuration
+			// to fill in the missing data.
+
+			// try to grab the file system for this path/URI
+			FileSystem filesystem = FileSystem.get(checkpointDataUri);
+			if (filesystem == null) {
+				String reason = "Could not find a file system for the given scheme in" +
+				"the available configurations.";
+				LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " +
+						"problem or by the fact that the file system is not accessible from the " +
+						"client. Reason:{}", reason);
+				return new Path(checkpointDataUri);
+			}
+
+			URI fsURI = filesystem.getUri();
+			try {
+				URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null);
+				return new Path(baseURI);
+			} catch (URISyntaxException e) {
+				String reason = String.format(
+						"Cannot create file system URI for checkpointDataUri %s and filesystem URI %s: " + e.toString(),
+						checkpointDataUri,
+						fsURI);
+				LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " +
+						"problem or by the fact that the file system is not accessible from the " +
+						"client. Reason: {}", reason);
+				return new Path(checkpointDataUri);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
new file mode 100644
index 0000000..1b09d9c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+
+import java.util.Collection;
+
+/**
+ * Base class for {@link MergingState} that is stored on the heap.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends State, SD extends StateDescriptor<S, ?>>
+		extends AbstractHeapState<K, N, SV, S, SD> {
+
+	/**
+	 * The merge transformation function that implements the merge logic.
+	 */
+	private final MergeTransformation mergeTransformation;
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 *
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 */
+	protected AbstractHeapMergingState(
+			SD stateDesc,
+			StateTable<K, N, SV> stateTable,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) {
+
+		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.mergeTransformation = new MergeTransformation();
+	}
+
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		if (sources == null || sources.isEmpty()) {
+			return; // nothing to do
+		}
+
+		final StateTable<K, N, SV> map = stateTable;
+
+		SV merged = null;
+
+		// merge the sources
+		for (N source : sources) {
+
+			// get and remove the next source per namespace/key
+			SV sourceState = map.removeAndGetOld(source);
+
+			if (merged != null && sourceState != null) {
+				merged = mergeState(merged, sourceState);
+			} else if (merged == null) {
+				merged = sourceState;
+			}
+		}
+
+		// merge into the target, if needed
+		if (merged != null) {
+			map.transform(target, merged, mergeTransformation);
+		}
+	}
+
+	protected abstract SV mergeState(SV a, SV b) throws Exception;
+
+	final class MergeTransformation implements StateTransformationFunction<SV, SV> {
+
+		@Override
+		public SV apply(SV targetState, SV merged) throws Exception {
+			if (targetState != null) {
+				return mergeState(targetState, merged);
+			} else {
+				return merged;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
new file mode 100644
index 0000000..c93ea6a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for partitioned {@link ListState} implementations that are backed by a regular
+ * heap hash map. The concrete implementations define how the state is checkpointed.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
+	implements KvState<N>, State {
+
+	/** Map containing the actual key/value pairs */
+	protected final StateTable<K, N, SV> stateTable;
+
+	/** This holds the name of the state and can create an initial default value for the state. */
+	protected final SD stateDesc;
+
+	/** The current namespace, which the access methods will refer to. */
+	protected N currentNamespace;
+
+	protected final TypeSerializer<K> keySerializer;
+
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 *
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 */
+	protected AbstractHeapState(
+			SD stateDesc,
+			StateTable<K, N, SV> stateTable,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) {
+
+		this.stateDesc = stateDesc;
+		this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null.");
+		this.keySerializer = keySerializer;
+		this.namespaceSerializer = namespaceSerializer;
+		this.currentNamespace = null;
+	}
+
+	// ------------------------------------------------------------------------
+
+
+	public final void clear() {
+		stateTable.remove(currentNamespace);
+	}
+
+	public final void setCurrentNamespace(N namespace) {
+		this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace must not be null.");
+	}
+
+	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
+		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+
+		Tuple2<K, N> keyAndNamespace = KvStateRequestSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
+
+		return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1);
+	}
+
+	public byte[] getSerializedValue(K key, N namespace) throws Exception {
+		Preconditions.checkState(namespace != null, "No namespace given.");
+		Preconditions.checkState(key != null, "No key given.");
+
+		SV result = stateTable.get(key, namespace);
+
+		if (result == null) {
+			return null;
+		}
+
+		@SuppressWarnings("unchecked,rawtypes")
+		TypeSerializer serializer = stateDesc.getSerializer();
+		return KvStateRequestSerializer.serializeValue(result, serializer);
+	}
+
+	/**
+	 * This should only be used for testing.
+	 */
+	@VisibleForTesting
+	public StateTable<K, N, SV> getStateTable() {
+		return stateTable;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
new file mode 100644
index 0000000..8a1d3f3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how
+ * the snapshot is written during the serialization phase of checkpointing.
+ */
+@Internal
+abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateTableSnapshot {
+
+	/**
+	 * The {@link StateTable} from which this snapshot was created.
+	 */
+	final T owningStateTable;
+
+	/**
+	 * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table.
+	 *
+	 * @param owningStateTable the {@link StateTable} for which this object represents a snapshot.
+	 */
+	AbstractStateTableSnapshot(T owningStateTable) {
+		this.owningStateTable = Preconditions.checkNotNull(owningStateTable);
+	}
+
+	/**
+	 * Optional hook to release resources for this snapshot at the end of its lifecycle.
+	 */
+	@Override
+	public void release() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
new file mode 100644
index 0000000..e19ed00
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationUtil;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to
+ * streams provided by a {@link CheckpointStreamFactory} upon
+ * checkpointing.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+public class AsyncHeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AsyncHeapKeyedStateBackend.class);
+
+	/**
+	 * Map of state tables that stores all state of key/value states. We store it centrally so
+	 * that we can easily checkpoint/restore it.
+	 *
+	 * <p>The actual parameters of StateTable are {@code StateTable<NamespaceT, Map<KeyT, StateT>>}
+	 * but we can't put them here because different key/value states with different types and
+	 * namespace types share this central list of tables.
+	 */
+	private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+
+	public AsyncHeapKeyedStateBackend(
+			TaskKvStateRegistry kvStateRegistry,
+			TypeSerializer<K> keySerializer,
+			ClassLoader userCodeClassLoader,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange) {
+
+		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
+		LOG.info("Initializing heap keyed state backend with stream factory.");
+	}
+
+	// ------------------------------------------------------------------------
+	//  state backend operations
+	// ------------------------------------------------------------------------
+
+	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
+
+		return tryRegisterStateTable(
+				stateDesc.getName(), stateDesc.getType(),
+				namespaceSerializer, stateDesc.getSerializer());
+	}
+
+	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+			String stateName,
+			StateDescriptor.Type stateType,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> valueSerializer) {
+
+		final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+				new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+
+		@SuppressWarnings("unchecked")
+		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
+
+		if (stateTable == null) {
+			stateTable = newStateTable(newMetaInfo);
+			stateTables.put(stateName, stateTable);
+		} else {
+			if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
+				throw new RuntimeException("Trying to access state using incompatible meta info, was " +
+						stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
+			}
+			stateTable.setMetaInfo(newMetaInfo);
+		}
+		return stateTable;
+	}
+
+	private boolean hasRegisteredState() {
+		return !stateTables.isEmpty();
+	}
+
+	@Override
+	public <N, V> ValueState<V> createValueState(
+		TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<V> stateDesc) throws Exception {
+
+		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+		return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+	}
+
+	@Override
+	public <N, T> ListState<T> createListState(
+		TypeSerializer<N> namespaceSerializer,
+		ListStateDescriptor<T> stateDesc) throws Exception {
+
+		// the list state does some manual mapping, because the state is typed to the generic
+		// 'List' interface, but we want to use an implementation typed to ArrayList
+		// using a more specialized implementation opens up runtime optimizations
+
+		StateTable<K, N, ArrayList<T>> stateTable = tryRegisterStateTable(
+			stateDesc.getName(),
+			stateDesc.getType(),
+			namespaceSerializer,
+			new ArrayListSerializer<T>(stateDesc.getSerializer()));
+
+		return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+	}
+
+	@Override
+	public <N, T> ReducingState<T> createReducingState(
+		TypeSerializer<N> namespaceSerializer,
+		ReducingStateDescriptor<T> stateDesc) throws Exception {
+
+		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+		return new HeapReducingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+	}
+
+	@Override
+	public <N, T, ACC> FoldingState<T, ACC> createFoldingState(
+		TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+
+		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+		return new HeapFoldingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public  RunnableFuture<KeyGroupsStateHandle> snapshot(
+			final long checkpointId,
+			final long timestamp,
+			final CheckpointStreamFactory streamFactory) throws Exception {
+
+		if (!hasRegisteredState()) {
+			return DoneFuture.nullValue();
+		}
+
+		long syncStartTime = System.currentTimeMillis();
+
+		Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
+				"Too many KV-States: " + stateTables.size() +
+						". Currently at most " + Short.MAX_VALUE + " states are supported");
+
+		List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
+
+		final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
+
+		final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size());
+
+		for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+			RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
+			KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+					metaInfo.getStateType(),
+					metaInfo.getName(),
+					metaInfo.getNamespaceSerializer(),
+					metaInfo.getStateSerializer());
+
+			metaInfoProxyList.add(metaInfoProxy);
+			kVStateToId.put(kvState.getKey(), kVStateToId.size());
+			StateTable<K, ?, ?> stateTable = kvState.getValue();
+			if (null != stateTable) {
+				cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot());
+			}
+		}
+
+		final KeyedBackendSerializationProxy serializationProxy =
+				new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+		//--------------------------------------------------- this becomes the end of sync part
+
+		// implementation of the async IO operation, based on FutureTask
+		final AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+				new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+					AtomicBoolean open = new AtomicBoolean(false);
+
+					@Override
+					public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+						if (open.compareAndSet(false, true)) {
+							CheckpointStreamFactory.CheckpointStateOutputStream stream =
+									streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+							try {
+								cancelStreamRegistry.registerClosable(stream);
+								return stream;
+							} catch (Exception ex) {
+								open.set(false);
+								throw ex;
+							}
+						} else {
+							throw new IOException("Operation already opened.");
+						}
+					}
+
+					@Override
+					public KeyGroupsStateHandle performOperation() throws Exception {
+						long asyncStartTime = System.currentTimeMillis();
+						CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+						DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+						serializationProxy.write(outView);
+
+						long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
+
+						for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+							int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
+							keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+							outView.writeInt(keyGroupId);
+
+							for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+								outView.writeShort(kVStateToId.get(kvState.getKey()));
+								cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId);
+							}
+						}
+
+						if (open.compareAndSet(true, false)) {
+							StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+							KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+							final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+
+							LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
+								streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+
+							return keyGroupsStateHandle;
+						} else {
+							throw new IOException("Checkpoint stream already closed.");
+						}
+					}
+
+					@Override
+					public void done(boolean canceled) {
+						if (open.compareAndSet(true, false)) {
+							CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+							if (null != stream) {
+								cancelStreamRegistry.unregisterClosable(stream);
+								IOUtils.closeQuietly(stream);
+							}
+						}
+						for (StateTableSnapshot snapshot : cowStateStableSnapshots.values()) {
+							snapshot.release();
+						}
+					}
+				};
+
+		AsyncStoppableTaskWithCallback<KeyGroupsStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
+
+		LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " +
+				Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
+
+		return task;
+	}
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public void restore(Collection<KeyGroupsStateHandle> restoredState) throws Exception {
+		LOG.info("Initializing heap keyed state backend from snapshot.");
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
+		}
+
+		if (MigrationUtil.isOldSavepointKeyedState(restoredState)) {
+			throw new UnsupportedOperationException(
+				"This async.HeapKeyedStateBackend does not support restore from old savepoints.");
+		} else {
+			restorePartitionedState(restoredState);
+		}
+	}
+
+	@SuppressWarnings({"unchecked"})
+	private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) throws Exception {
+
+		final Map<Integer, String> kvStatesById = new HashMap<>();
+		int numRegisteredKvStates = 0;
+		stateTables.clear();
+
+		for (KeyGroupsStateHandle keyGroupsHandle : state) {
+
+			if (keyGroupsHandle == null) {
+				continue;
+			}
+
+			FSDataInputStream fsDataInputStream = keyGroupsHandle.openInputStream();
+			cancelStreamRegistry.registerClosable(fsDataInputStream);
+
+			try {
+				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
+
+				KeyedBackendSerializationProxy serializationProxy =
+						new KeyedBackendSerializationProxy(userCodeClassLoader);
+
+				serializationProxy.read(inView);
+
+				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+						serializationProxy.getNamedStateSerializationProxies();
+
+				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
+
+					StateTable<K, ?, ?> stateTable = stateTables.get(metaInfoSerializationProxy.getStateName());
+
+					//important: only create a new table we did not already create it previously
+					if (null == stateTable) {
+
+						RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+								new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+
+						stateTable = newStateTable(registeredBackendStateMetaInfo);
+						stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
+						kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
+						++numRegisteredKvStates;
+					}
+				}
+
+				for (Tuple2<Integer, Long> groupOffset : keyGroupsHandle.getGroupRangeOffsets()) {
+					int keyGroupIndex = groupOffset.f0;
+					long offset = groupOffset.f1;
+					fsDataInputStream.seek(offset);
+
+					int writtenKeyGroupIndex = inView.readInt();
+
+					Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+							"Unexpected key-group in restore.");
+
+					for (int i = 0; i < metaInfoList.size(); i++) {
+						int kvStateId = inView.readShort();
+						StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
+
+						// Hardcoding 2 as version will lead to the right method for the
+						// serialization format. Due to th backport, we should keep this fix and do
+						// not allow restore from a different format.
+						StateTableByKeyGroupReader keyGroupReader =
+								StateTableByKeyGroupReaders.readerForVersion(
+										stateTable,
+										2);
+
+						keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
+					}
+				}
+			} finally {
+				cancelStreamRegistry.unregisterClosable(fsDataInputStream);
+				IOUtils.closeQuietly(fsDataInputStream);
+			}
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "HeapKeyedStateBackend";
+	}
+
+	/**
+	 * Returns the total number of state entries across all keys/namespaces.
+	 */
+	@VisibleForTesting
+	@SuppressWarnings("unchecked")
+	public int numStateEntries() {
+		int sum = 0;
+		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+			sum += stateTable.size();
+		}
+		return sum;
+	}
+
+	/**
+	 * Returns the total number of state entries across all keys for the given namespace.
+	 */
+	@VisibleForTesting
+	public int numStateEntries(Object namespace) {
+		int sum = 0;
+		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+			sum += stateTable.sizeOfNamespace(namespace);
+		}
+		return sum;
+	}
+
+	private <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+		return new CopyOnWriteStateTable<>(this, newMetaInfo);
+	}
+}