You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/24 17:51:45 UTC
[3/3] flink git commit: [FLINK-5715] Asynchronous snapshots for
heap-based keyed state backend (backport from 1.3)
[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend (backport from 1.3)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6a80725
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6a80725
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6a80725
Branch: refs/heads/release-1.2
Commit: c6a80725053c49dd2064405577291bdc86c82003
Parents: b703a24
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Mar 23 11:36:56 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Mar 24 18:51:19 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/util/MathUtils.java | 47 +-
.../state/AbstractKeyedStateBackend.java | 18 +-
.../state/StateTransformationFunction.java | 42 +
.../filesystem/async/AsyncFsStateBackend.java | 266 +++++
.../heap/async/AbstractHeapMergingState.java | 104 ++
.../state/heap/async/AbstractHeapState.java | 119 ++
.../heap/async/AbstractStateTableSnapshot.java | 51 +
.../heap/async/AsyncHeapKeyedStateBackend.java | 433 +++++++
.../state/heap/async/CopyOnWriteStateTable.java | 1066 ++++++++++++++++++
.../async/CopyOnWriteStateTableSnapshot.java | 188 +++
.../state/heap/async/HeapFoldingState.java | 99 ++
.../runtime/state/heap/async/HeapListState.java | 122 ++
.../state/heap/async/HeapReducingState.java | 107 ++
.../state/heap/async/HeapValueState.java | 73 ++
.../state/heap/async/InternalKeyContext.java | 60 +
.../runtime/state/heap/async/StateEntry.java | 44 +
.../runtime/state/heap/async/StateTable.java | 189 ++++
.../heap/async/StateTableByKeyGroupReader.java | 38 +
.../heap/async/StateTableByKeyGroupReaders.java | 136 +++
.../state/heap/async/StateTableSnapshot.java | 45 +
.../memory/async/AsyncMemoryStateBackend.java | 94 ++
.../state/AsyncFileStateBackendTest.java | 213 ++++
.../state/AsyncMemoryStateBackendTest.java | 197 ++++
.../runtime/state/MemoryStateBackendTest.java | 2 +-
.../runtime/state/StateBackendTestBase.java | 150 +++
.../heap/async/CopyOnWriteStateTableTest.java | 486 ++++++++
.../state/heap/async/HeapListStateTest.java | 238 ++++
.../state/heap/async/HeapReducingStateTest.java | 236 ++++
.../heap/async/HeapStateBackendTestBase.java | 37 +
.../util/BlockerCheckpointStreamFactory.java | 118 ++
.../api/windowing/windows/TimeWindow.java | 49 +-
...tractEventTimeWindowCheckpointingITCase.java | 14 +-
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
34 files changed, 5106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 074e8ae..4c52b6e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -22,13 +22,13 @@ package org.apache.flink.util;
* Collection of simple mathematical routines.
*/
public final class MathUtils {
-
+
/**
* Computes the logarithm of the given value to the base of 2, rounded down. It corresponds to the
* position of the highest non-zero bit. The position is counted, starting with 0 from the least
* significant bit to the most significant bit. For example, <code>log2floor(16) = 4</code>, and
* <code>log2floor(10) = 3</code>.
- *
+ *
* @param value The value to compute the logarithm for.
* @return The logarithm (rounded down) to the base of 2.
* @throws ArithmeticException Thrown, if the given value is zero.
@@ -40,11 +40,11 @@ public final class MathUtils {
return 31 - Integer.numberOfLeadingZeros(value);
}
-
+
/**
* Computes the logarithm of the given value to the base of 2. This method throws an error,
* if the given argument is not a power of 2.
- *
+ *
* @param value The value to compute the logarithm for.
* @return The logarithm to the base of 2.
* @throws ArithmeticException Thrown, if the given value is zero.
@@ -59,25 +59,25 @@ public final class MathUtils {
}
return 31 - Integer.numberOfLeadingZeros(value);
}
-
+
/**
* Decrements the given number down to the closest power of two. If the argument is a
* power of two, it remains unchanged.
- *
+ *
* @param value The value to round down.
* @return The closest value that is a power of two and less or equal than the given value.
*/
public static int roundDownToPowerOf2(int value) {
return Integer.highestOneBit(value);
}
-
+
/**
* Casts the given value to a 32 bit integer, if it can be safely done. If the cast would change the numeric
* value, this method raises an exception.
* <p>
* This method is a protection in places where one expects to be able to safely case, but where unexpected
* situations could make the cast unsafe and would cause hidden problems that are hard to track down.
- *
+ *
* @param value The value to be cast to an integer.
* @return The given value as an integer.
* @see Math#toIntExact(long)
@@ -172,8 +172,37 @@ public final class MathUtils {
return x + 1;
}
+ /**
+ * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution.
+ *
+ * @param in the long (64-bit)input.
+ * @return the bit-mixed int (32-bit) output
+ */
+ public static int longToIntWithBitMixing(long in) {
+ in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
+ in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
+ in = in ^ (in >>> 31);
+ return (int) in;
+ }
+
+ /**
+ * Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is
+ * from Murmur's 32 bit finalizer.
+ *
+ * @param in the input value
+ * @return the bit-mixed output value
+ */
+ public static int bitMix(int in) {
+ in ^= in >>> 16;
+ in *= 0x85ebca6b;
+ in ^= in >>> 13;
+ in *= 0xc2b2ae35;
+ in ^= in >>> 16;
+ return in;
+ }
+
// ============================================================================================
-
+
/**
* Prevent Instantiation through private constructor.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 2daf896..23c9a49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -35,6 +35,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.heap.async.AbstractHeapMergingState;
+import org.apache.flink.runtime.state.heap.async.InternalKeyContext;
import org.apache.flink.util.Preconditions;
import java.io.Closeable;
@@ -51,7 +53,7 @@ import java.util.List;
* @param <K> Type of the key by which state is keyed.
*/
public abstract class AbstractKeyedStateBackend<K>
- implements KeyedStateBackend<K>, Snapshotable<KeyGroupsStateHandle>, Closeable {
+ implements KeyedStateBackend<K>, Snapshotable<KeyGroupsStateHandle>, Closeable, InternalKeyContext<K> {
/** {@link TypeSerializer} for our key. */
protected final TypeSerializer<K> keySerializer;
@@ -205,6 +207,7 @@ public abstract class AbstractKeyedStateBackend<K>
/**
* @see KeyedStateBackend
*/
+ @Override
public KeyGroupRange getKeyGroupRange() {
return keyGroupRange;
}
@@ -293,10 +296,16 @@ public abstract class AbstractKeyedStateBackend<K>
@Override
@SuppressWarnings("unchecked,rawtypes")
public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
- if (stateDescriptor instanceof ReducingStateDescriptor) {
+
+ State stateRef = getPartitionedState(target, namespaceSerializer, stateDescriptor);
+ if (stateRef instanceof AbstractHeapMergingState) {
+
+ ((AbstractHeapMergingState) stateRef).mergeNamespaces(target, sources);
+ } else if (stateDescriptor instanceof ReducingStateDescriptor) {
+
ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor;
+ ReducingState state = (ReducingState) stateRef;
ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
- ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
KvState kvState = (KvState) state;
Object result = null;
for (N source: sources) {
@@ -314,7 +323,8 @@ public abstract class AbstractKeyedStateBackend<K>
state.add(result);
}
} else if (stateDescriptor instanceof ListStateDescriptor) {
- ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+
+ ListState<Object> state = (ListState) stateRef;
KvState kvState = (KvState) state;
List<Object> result = new ArrayList<>();
for (N source: sources) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
new file mode 100644
index 0000000..182b4c8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for a binary function that is used for push-down of state transformation into state backends. The
+ * function takes as inputs the old state and an element. From those inputs, the function computes the new state.
+ *
+ * @param <S> type of the previous state that is the bases for the computation of the new state.
+ * @param <T> type of the element value that is used to compute the change of state.
+ */
+@Internal
+public interface StateTransformationFunction<S, T> {
+
+ /**
+ * Binary function that applies a given value to the given old state to compute the new state.
+ *
+ * @param previousState the previous state that is the basis for the transformation.
+ * @param value the value that the implementation applies to the old state to obtain the new state.
+ * @return the new state, computed by applying the given value on the given old state.
+ * @throws Exception if something goes wrong in applying the transformation function.
+ */
+ S apply(S previousState, T value) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
new file mode 100644
index 0000000..d90ffbd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem.async;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * The file state backend is a state backend that stores the state of streaming jobs in a file system.
+ *
+ * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
+ * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
+ * files for each state, for example:
+ *
+ * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
+ */
+public class AsyncFsStateBackend extends AbstractStateBackend {
+
+ private static final long serialVersionUID = -8191916350224044011L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncFsStateBackend.class);
+
+ /** By default, state smaller than 1024 bytes will not be written to files, but
+ * will be stored directly with the metadata */
+ public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
+
+ /** Maximum size of state that is stored with the metadata, rather than in files */
+ private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+
+ /** The path to the directory for the checkpoint data, including the file system
+ * description via scheme and optional authority */
+ private final Path basePath;
+
+ /** State below this size will be stored as part of the metadata, rather than in files */
+ private final int fileStateThreshold;
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public AsyncFsStateBackend(String checkpointDataUri) throws IOException {
+ this(new Path(checkpointDataUri));
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public AsyncFsStateBackend(Path checkpointDataUri) throws IOException {
+ this(checkpointDataUri.toUri());
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public AsyncFsStateBackend(URI checkpointDataUri) throws IOException {
+ this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
+ * rather than in files
+ *
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public AsyncFsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
+ if (fileStateSizeThreshold < 0) {
+ throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
+ }
+ if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
+ throw new IllegalArgumentException("The threshold for file state size cannot be larger than " +
+ MAX_FILE_STATE_THRESHOLD);
+ }
+ this.fileStateThreshold = fileStateSizeThreshold;
+
+ this.basePath = validateAndNormalizeUri(checkpointDataUri);
+ }
+
+ /**
+ * Gets the base directory where all state-containing files are stored.
+ * The job specific directory is created inside this directory.
+ *
+ * @return The base directory.
+ */
+ public Path getBasePath() {
+ return basePath;
+ }
+
+ // ------------------------------------------------------------------------
+ // initialization and cleanup
+ // ------------------------------------------------------------------------
+
+ @Override
+ public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
+ return new FsCheckpointStreamFactory(basePath, jobId, fileStateThreshold);
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry) throws Exception {
+ return new AsyncHeapKeyedStateBackend<>(
+ kvStateRegistry,
+ keySerializer,
+ env.getUserClassLoader(),
+ numberOfKeyGroups,
+ keyGroupRange);
+ }
+
+ @Override
+ public String toString() {
+ return "File State Backend @ " + basePath;
+ }
+
+ /**
+ * Checks and normalizes the checkpoint data URI. This method first checks the validity of the
+ * URI (scheme, path, availability of a matching file system) and then normalizes the URI
+ * to a path.
+ *
+ * <p>If the URI does not include an authority, but the file system configured for the URI has an
+ * authority, then the normalized path will include this authority.
+ *
+ * @param checkpointDataUri The URI to check and normalize.
+ * @return A normalized URI as a Path.
+ *
+ * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
+ * @throws IOException Thrown, if no file system can be found for the URI's scheme.
+ */
+ public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException {
+ final String scheme = checkpointDataUri.getScheme();
+ final String path = checkpointDataUri.getPath();
+
+ // some validity checks
+ if (scheme == null) {
+ throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+ "Please specify the file system scheme explicitly in the URI.");
+ }
+ if (path == null) {
+ throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
+ "Please specify a directory path for the checkpoint data.");
+ }
+ if (path.length() == 0 || path.equals("/")) {
+ throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
+ }
+
+ if (!FileSystem.isFlinkSupportedScheme(checkpointDataUri.getScheme())) {
+ // skip verification checks for non-flink supported filesystem
+ // this is because the required filesystem classes may not be available to the flink client
+ return new Path(checkpointDataUri);
+ } else {
+ // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
+ // (distributed) filesystem on all hosts and includes full host/port information, even if the
+ // original URI did not include that. We count on the filesystem loading from the configuration
+ // to fill in the missing data.
+
+ // try to grab the file system for this path/URI
+ FileSystem filesystem = FileSystem.get(checkpointDataUri);
+ if (filesystem == null) {
+ String reason = "Could not find a file system for the given scheme in" +
+ "the available configurations.";
+ LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " +
+ "problem or by the fact that the file system is not accessible from the " +
+ "client. Reason:{}", reason);
+ return new Path(checkpointDataUri);
+ }
+
+ URI fsURI = filesystem.getUri();
+ try {
+ URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null);
+ return new Path(baseURI);
+ } catch (URISyntaxException e) {
+ String reason = String.format(
+ "Cannot create file system URI for checkpointDataUri %s and filesystem URI %s: " + e.toString(),
+ checkpointDataUri,
+ fsURI);
+ LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " +
+ "problem or by the fact that the file system is not accessible from the " +
+ "client. Reason: {}", reason);
+ return new Path(checkpointDataUri);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
new file mode 100644
index 0000000..1b09d9c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+
+import java.util.Collection;
+
+/**
+ * Base class for {@link MergingState} that is stored on the heap.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends State, SD extends StateDescriptor<S, ?>>
+ extends AbstractHeapState<K, N, SV, S, SD> {
+
+ /**
+ * The merge transformation function that implements the merge logic.
+ */
+ private final MergeTransformation mergeTransformation;
+
+ /**
+ * Creates a new key/value state for the given hash map of key/value pairs.
+ *
+ * @param stateDesc The state identifier for the state. This contains name
+ * and can create a default state value.
+ * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ */
+ protected AbstractHeapMergingState(
+ SD stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer) {
+
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.mergeTransformation = new MergeTransformation();
+ }
+
+ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+ if (sources == null || sources.isEmpty()) {
+ return; // nothing to do
+ }
+
+ final StateTable<K, N, SV> map = stateTable;
+
+ SV merged = null;
+
+ // merge the sources
+ for (N source : sources) {
+
+ // get and remove the next source per namespace/key
+ SV sourceState = map.removeAndGetOld(source);
+
+ if (merged != null && sourceState != null) {
+ merged = mergeState(merged, sourceState);
+ } else if (merged == null) {
+ merged = sourceState;
+ }
+ }
+
+ // merge into the target, if needed
+ if (merged != null) {
+ map.transform(target, merged, mergeTransformation);
+ }
+ }
+
+ protected abstract SV mergeState(SV a, SV b) throws Exception;
+
+ final class MergeTransformation implements StateTransformationFunction<SV, SV> {
+
+ @Override
+ public SV apply(SV targetState, SV merged) throws Exception {
+ if (targetState != null) {
+ return mergeState(targetState, merged);
+ } else {
+ return merged;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
new file mode 100644
index 0000000..c93ea6a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for partitioned {@link ListState} implementations that are backed by a regular
+ * heap hash map. The concrete implementations define how the state is checkpointed.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
+ implements KvState<N>, State {
+
+ /** Map containing the actual key/value pairs */
+ protected final StateTable<K, N, SV> stateTable;
+
+ /** This holds the name of the state and can create an initial default value for the state. */
+ protected final SD stateDesc;
+
+ /** The current namespace, which the access methods will refer to. */
+ protected N currentNamespace;
+
+ protected final TypeSerializer<K> keySerializer;
+
+ protected final TypeSerializer<N> namespaceSerializer;
+
+ /**
+ * Creates a new key/value state for the given hash map of key/value pairs.
+ *
+ * @param stateDesc The state identifier for the state. This contains name
+ * and can create a default state value.
+ * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ */
+ protected AbstractHeapState(
+ SD stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer) {
+
+ this.stateDesc = stateDesc;
+ this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null.");
+ this.keySerializer = keySerializer;
+ this.namespaceSerializer = namespaceSerializer;
+ this.currentNamespace = null;
+ }
+
+ // ------------------------------------------------------------------------
+
+
+ public final void clear() {
+ stateTable.remove(currentNamespace);
+ }
+
+ public final void setCurrentNamespace(N namespace) {
+ this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace must not be null.");
+ }
+
+ public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
+ Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+
+ Tuple2<K, N> keyAndNamespace = KvStateRequestSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, keySerializer, namespaceSerializer);
+
+ return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1);
+ }
+
+ public byte[] getSerializedValue(K key, N namespace) throws Exception {
+ Preconditions.checkState(namespace != null, "No namespace given.");
+ Preconditions.checkState(key != null, "No key given.");
+
+ SV result = stateTable.get(key, namespace);
+
+ if (result == null) {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked,rawtypes")
+ TypeSerializer serializer = stateDesc.getSerializer();
+ return KvStateRequestSerializer.serializeValue(result, serializer);
+ }
+
+ /**
+ * This should only be used for testing.
+ */
+ @VisibleForTesting
+ public StateTable<K, N, SV> getStateTable() {
+ return stateTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
new file mode 100644
index 0000000..8a1d3f3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how
+ * the snapshot is written during the serialization phase of checkpointing.
+ */
+@Internal
+abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateTableSnapshot {
+
+ /**
+ * The {@link StateTable} from which this snapshot was created.
+ */
+ final T owningStateTable;
+
+ /**
+ * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table.
+ *
+ * @param owningStateTable the {@link StateTable} for which this object represents a snapshot.
+ */
+ AbstractStateTableSnapshot(T owningStateTable) {
+ this.owningStateTable = Preconditions.checkNotNull(owningStateTable);
+ }
+
+ /**
+ * Optional hook to release resources for this snapshot at the end of its lifecycle.
+ */
+ @Override
+ public void release() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
new file mode 100644
index 0000000..e19ed00
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationUtil;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to
+ * streams provided by a {@link CheckpointStreamFactory} upon
+ * checkpointing.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+public class AsyncHeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncHeapKeyedStateBackend.class);
+
+ /**
+ * Map of state tables that stores all state of key/value states. We store it centrally so
+ * that we can easily checkpoint/restore it.
+ *
+ * <p>The actual parameters of StateTable are {@code StateTable<NamespaceT, Map<KeyT, StateT>>}
+ * but we can't put them here because different key/value states with different types and
+ * namespace types share this central list of tables.
+ */
+ private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+
+ public AsyncHeapKeyedStateBackend(
+ TaskKvStateRegistry kvStateRegistry,
+ TypeSerializer<K> keySerializer,
+ ClassLoader userCodeClassLoader,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange) {
+
+ super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
+ LOG.info("Initializing heap keyed state backend with stream factory.");
+ }
+
+ // ------------------------------------------------------------------------
+ // state backend operations
+ // ------------------------------------------------------------------------
+
+ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
+
+ return tryRegisterStateTable(
+ stateDesc.getName(), stateDesc.getType(),
+ namespaceSerializer, stateDesc.getSerializer());
+ }
+
+ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ String stateName,
+ StateDescriptor.Type stateType,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<V> valueSerializer) {
+
+ final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+
+ @SuppressWarnings("unchecked")
+ StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
+
+ if (stateTable == null) {
+ stateTable = newStateTable(newMetaInfo);
+ stateTables.put(stateName, stateTable);
+ } else {
+ if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
+ throw new RuntimeException("Trying to access state using incompatible meta info, was " +
+ stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
+ }
+ stateTable.setMetaInfo(newMetaInfo);
+ }
+ return stateTable;
+ }
+
+ private boolean hasRegisteredState() {
+ return !stateTables.isEmpty();
+ }
+
+ @Override
+ public <N, V> ValueState<V> createValueState(
+ TypeSerializer<N> namespaceSerializer,
+ ValueStateDescriptor<V> stateDesc) throws Exception {
+
+ StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+ return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ }
+
+ @Override
+ public <N, T> ListState<T> createListState(
+ TypeSerializer<N> namespaceSerializer,
+ ListStateDescriptor<T> stateDesc) throws Exception {
+
+ // the list state does some manual mapping, because the state is typed to the generic
+ // 'List' interface, but we want to use an implementation typed to ArrayList
+ // using a more specialized implementation opens up runtime optimizations
+
+ StateTable<K, N, ArrayList<T>> stateTable = tryRegisterStateTable(
+ stateDesc.getName(),
+ stateDesc.getType(),
+ namespaceSerializer,
+ new ArrayListSerializer<T>(stateDesc.getSerializer()));
+
+ return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ }
+
+ @Override
+ public <N, T> ReducingState<T> createReducingState(
+ TypeSerializer<N> namespaceSerializer,
+ ReducingStateDescriptor<T> stateDesc) throws Exception {
+
+ StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+ return new HeapReducingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ }
+
+ @Override
+ public <N, T, ACC> FoldingState<T, ACC> createFoldingState(
+ TypeSerializer<N> namespaceSerializer,
+ FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+
+ StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+ return new HeapFoldingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public RunnableFuture<KeyGroupsStateHandle> snapshot(
+ final long checkpointId,
+ final long timestamp,
+ final CheckpointStreamFactory streamFactory) throws Exception {
+
+ if (!hasRegisteredState()) {
+ return DoneFuture.nullValue();
+ }
+
+ long syncStartTime = System.currentTimeMillis();
+
+ Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
+ "Too many KV-States: " + stateTables.size() +
+ ". Currently at most " + Short.MAX_VALUE + " states are supported");
+
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
+
+ final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
+
+ final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size());
+
+ for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
+ KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+ metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
+
+ metaInfoProxyList.add(metaInfoProxy);
+ kVStateToId.put(kvState.getKey(), kVStateToId.size());
+ StateTable<K, ?, ?> stateTable = kvState.getValue();
+ if (null != stateTable) {
+ cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot());
+ }
+ }
+
+ final KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+ //--------------------------------------------------- this becomes the end of sync part
+
+ // implementation of the async IO operation, based on FutureTask
+ final AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+ new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+ AtomicBoolean open = new AtomicBoolean(false);
+
+ @Override
+ public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+ if (open.compareAndSet(false, true)) {
+ CheckpointStreamFactory.CheckpointStateOutputStream stream =
+ streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ try {
+ cancelStreamRegistry.registerClosable(stream);
+ return stream;
+ } catch (Exception ex) {
+ open.set(false);
+ throw ex;
+ }
+ } else {
+ throw new IOException("Operation already opened.");
+ }
+ }
+
+ @Override
+ public KeyGroupsStateHandle performOperation() throws Exception {
+ long asyncStartTime = System.currentTimeMillis();
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+ DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+ serializationProxy.write(outView);
+
+ long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
+
+ for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+ int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
+ keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+ outView.writeInt(keyGroupId);
+
+ for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ outView.writeShort(kVStateToId.get(kvState.getKey()));
+ cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId);
+ }
+ }
+
+ if (open.compareAndSet(true, false)) {
+ StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+ KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+ final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+
+ LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
+ streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+
+ return keyGroupsStateHandle;
+ } else {
+ throw new IOException("Checkpoint stream already closed.");
+ }
+ }
+
+ @Override
+ public void done(boolean canceled) {
+ if (open.compareAndSet(true, false)) {
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+ if (null != stream) {
+ cancelStreamRegistry.unregisterClosable(stream);
+ IOUtils.closeQuietly(stream);
+ }
+ }
+ for (StateTableSnapshot snapshot : cowStateStableSnapshots.values()) {
+ snapshot.release();
+ }
+ }
+ };
+
+ AsyncStoppableTaskWithCallback<KeyGroupsStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
+
+ LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " +
+ Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
+
+ return task;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void restore(Collection<KeyGroupsStateHandle> restoredState) throws Exception {
+ LOG.info("Initializing heap keyed state backend from snapshot.");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
+ }
+
+ if (MigrationUtil.isOldSavepointKeyedState(restoredState)) {
+ throw new UnsupportedOperationException(
+ "This async.HeapKeyedStateBackend does not support restore from old savepoints.");
+ } else {
+ restorePartitionedState(restoredState);
+ }
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) throws Exception {
+
+ final Map<Integer, String> kvStatesById = new HashMap<>();
+ int numRegisteredKvStates = 0;
+ stateTables.clear();
+
+ for (KeyGroupsStateHandle keyGroupsHandle : state) {
+
+ if (keyGroupsHandle == null) {
+ continue;
+ }
+
+ FSDataInputStream fsDataInputStream = keyGroupsHandle.openInputStream();
+ cancelStreamRegistry.registerClosable(fsDataInputStream);
+
+ try {
+ DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
+
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(userCodeClassLoader);
+
+ serializationProxy.read(inView);
+
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+ serializationProxy.getNamedStateSerializationProxies();
+
+ for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
+
+ StateTable<K, ?, ?> stateTable = stateTables.get(metaInfoSerializationProxy.getStateName());
+
+ //important: only create a new table we did not already create it previously
+ if (null == stateTable) {
+
+ RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+
+ stateTable = newStateTable(registeredBackendStateMetaInfo);
+ stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
+ kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
+ ++numRegisteredKvStates;
+ }
+ }
+
+ for (Tuple2<Integer, Long> groupOffset : keyGroupsHandle.getGroupRangeOffsets()) {
+ int keyGroupIndex = groupOffset.f0;
+ long offset = groupOffset.f1;
+ fsDataInputStream.seek(offset);
+
+ int writtenKeyGroupIndex = inView.readInt();
+
+ Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+ "Unexpected key-group in restore.");
+
+ for (int i = 0; i < metaInfoList.size(); i++) {
+ int kvStateId = inView.readShort();
+ StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
+
+ // Hardcoding 2 as version will lead to the right method for the
+ // serialization format. Due to th backport, we should keep this fix and do
+ // not allow restore from a different format.
+ StateTableByKeyGroupReader keyGroupReader =
+ StateTableByKeyGroupReaders.readerForVersion(
+ stateTable,
+ 2);
+
+ keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
+ }
+ }
+ } finally {
+ cancelStreamRegistry.unregisterClosable(fsDataInputStream);
+ IOUtils.closeQuietly(fsDataInputStream);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "HeapKeyedStateBackend";
+ }
+
+ /**
+ * Returns the total number of state entries across all keys/namespaces.
+ */
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ public int numStateEntries() {
+ int sum = 0;
+ for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+ sum += stateTable.size();
+ }
+ return sum;
+ }
+
+ /**
+ * Returns the total number of state entries across all keys for the given namespace.
+ */
+ @VisibleForTesting
+ public int numStateEntries(Object namespace) {
+ int sum = 0;
+ for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+ sum += stateTable.sizeOfNamespace(namespace);
+ }
+ return sum;
+ }
+
+ private <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+ return new CopyOnWriteStateTable<>(this, newMetaInfo);
+ }
+}