You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/16 17:34:28 UTC
[3/4] flink git commit: [FLINK-5715] Asynchronous snapshots for
heap-based keyed state backend
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
new file mode 100644
index 0000000..d63b6d3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -0,0 +1,1066 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.TreeSet;
+
+/**
+ * Implementation of Flink's in-memory state tables with copy-on-write support. This map does not support null values
+ * for key or namespace.
+ * <p>
+ * {@link CopyOnWriteStateTable} sacrifices some peak performance and memory efficiency for features like incremental
+ * rehashing and asynchronous snapshots through copy-on-write. Copy-on-write tries to minimize the amount of copying by
+ * maintaining version meta data for both, the map structure and the state objects. However, we must often proactively
+ * copy state objects when we hand them to the user.
+ * <p>
+ * As for any state backend, user should not keep references on state objects that they obtained from state backends
+ * outside the scope of the user function calls.
+ * <p>
+ * Some brief maintenance notes:
+ * <p>
+ * 1) Flattening the underlying data structure from nested maps (namespace) -> (key) -> (state) to one flat map
+ * (key, namespace) -> (state) brings certain performance trade-offs. In theory, the flat map has one less level of
+ * indirection compared to the nested map. However, the nested map naturally de-duplicates namespace objects for which
+ * #equals() is true. This leads to potentially a lot of redundant namespace objects for the flattened version. Those,
+ * in turn, can again introduce more cache misses because we need to follow the namespace object on all operations to
+ * ensure entry identities. Obviously, copy-on-write can also add memory overhead. So does the meta data to track
+ * copy-on-write requirement (state and entry versions on {@link StateTableEntry}).
+ * <p>
+ * 2) A flat map structure is a lot easier when it comes to tracking copy-on-write of the map structure.
+ * <p>
+ * 3) Nested structure had the (never used) advantage that we can easily drop and iterate whole namespaces. This could
+ * give locality advantages for certain access pattern, e.g. iterating a namespace.
+ * <p>
+ * 4) Serialization format is changed from namespace-prefix compressed (as naturally provided from the old nested
+ * structure) to making all entries self contained as (key, namespace, state).
+ * <p>
+ * 5) We got rid of having multiple nested tables, one for each key-group. Instead, we partition state into key-groups
+ * on-the-fly, during the asynchronous part of a snapshot.
+ * <p>
+ * 6) Currently, a state table can only grow, but never shrinks on low load. We could easily add this if required.
+ * <p>
+ * 7) Heap based state backends like this can easily cause a lot of GC activity. Besides using G1 as garbage collector,
+ * we should provide an additional state backend that operates on off-heap memory. This would sacrifice peak performance
+ * (due to de/serialization of objects) for a lower, but more constant throughput and potentially huge simplifications
+ * w.r.t. copy-on-write.
+ * <p>
+ * 8) We could try a hybrid of a serialized and object based backends, where key and namespace of the entries are both
+ * serialized in one byte-array.
+ * <p>
+ * 9) We could consider smaller types (e.g. short) for the version counting and think about some reset strategy before
+ * overflows, when there is no snapshot running. However, this would have to touch all entries in the map.
+ * <p>
+ * This class was initially based on the {@link java.util.HashMap} implementation of the Android JDK, but is now heavily
+ * customized towards the use case of table for state entries.
+ *
+ * IMPORTANT: the contracts for this class rely on the user not holding any references to objects returned by this map
+ * beyond the life cycle of per-element operations. Or phrased differently, all get-update-put operations on a mapping
+ * should be within one call of processElement. Otherwise, the user must take care of taking deep copies, e.g. for
+ * caching purposes.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of value.
+ */
+public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implements Iterable<StateEntry<K, N, S>> {
+
+ /**
+ * The logger.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
+
+ /**
+ * Min capacity (other than zero) for a {@link CopyOnWriteStateTable}. Must be a power of two
+ * greater than 1 (and less than 1 << 30).
+ */
+ private static final int MINIMUM_CAPACITY = 4;
+
+ /**
+ * Max capacity for a {@link CopyOnWriteStateTable}. Must be a power of two >= MINIMUM_CAPACITY.
+ */
+ private static final int MAXIMUM_CAPACITY = 1 << 30;
+
+ /**
+ * Minimum number of entries that one step of incremental rehashing migrates from the old to the new sub-table.
+ */
+ private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
+
+ /**
+ * An empty table shared by all zero-capacity maps (typically from default
+ * constructor). It is never written to, and replaced on first put. Its size
+ * is set to half the minimum, so that the first resize will create a
+ * minimum-sized table.
+ */
+ private static final StateTableEntry<?, ?, ?>[] EMPTY_TABLE = new StateTableEntry[MINIMUM_CAPACITY >>> 1];
+
+ /**
+ * Empty entry that we use to bootstrap our {@link CopyOnWriteStateTable.StateEntryIterator}.
+ */
+ private static final StateTableEntry<?, ?, ?> ITERATOR_BOOTSTRAP_ENTRY = new StateTableEntry<>();
+
+ /**
+ * Maintains an ordered set of version ids that are still in use by unreleased snapshots.
+ */
+ private final TreeSet<Integer> snapshotVersions;
+
+ /**
+ * This is the primary entry array (hash directory) of the state table. If no incremental rehash is ongoing, this
+ * is the only used table.
+ **/
+ private StateTableEntry<K, N, S>[] primaryTable;
+
+ /**
+ * We maintain a secondary entry array while performing an incremental rehash. The purpose is to slowly migrate
+ * entries from the primary table to this resized table array. When all entries are migrated, this becomes the new
+ * primary table.
+ */
+ private StateTableEntry<K, N, S>[] incrementalRehashTable;
+
+ /**
+ * The current number of mappings in the primary table.
+ */
+ private int primaryTableSize;
+
+ /**
+ * The current number of mappings in the rehash table.
+ */
+ private int incrementalRehashTableSize;
+
+ /**
+ * The next index for a step of incremental rehashing in the primary table.
+ */
+ private int rehashIndex;
+
+ /**
+ * The current version of this map. Used for copy-on-write mechanics.
+ */
+ private int stateTableVersion;
+
+ /**
+ * The highest version of this map that is still required by any unreleased snapshot.
+ */
+ private int highestRequiredSnapshotVersion;
+
+ /**
+ * The last namespace that was actually inserted. This is a small optimization to reduce duplicate namespace objects.
+ */
+ private N lastNamespace;
+
+ /**
+ * The {@link CopyOnWriteStateTable} is rehashed when its size exceeds this threshold.
+ * The value of this field is generally .75 * capacity, except when
+ * the capacity is zero, as described in the EMPTY_TABLE declaration
+ * above.
+ */
+ private int threshold;
+
+ /**
+ * Incremented by "structural modifications" to allow (best effort)
+ * detection of concurrent modification.
+ */
+ private int modCount;
+
+ /**
+ * Constructs a new {@code StateTable} with default capacity of 1024.
+ *
+ * @param keyContext the key context.
+ * @param metaInfo the meta information, including the type serializer for state copy-on-write.
+ */
+ CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ this(keyContext, metaInfo, 1024);
+ }
+
+ /**
+ * Constructs a new {@code StateTable} instance with the specified capacity.
+ *
+ * @param keyContext the key context.
+ * @param metaInfo the meta information, including the type serializer for state copy-on-write.
+ * @param capacity the initial capacity of this hash map.
+ * @throws IllegalArgumentException when the capacity is less than zero.
+ */
+ @SuppressWarnings("unchecked")
+ private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+ super(keyContext, metaInfo);
+
+ // initialized tables to EMPTY_TABLE.
+ this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+ this.incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+
+ // initialize sizes to 0.
+ this.primaryTableSize = 0;
+ this.incrementalRehashTableSize = 0;
+
+ this.rehashIndex = 0;
+ this.stateTableVersion = 0;
+ this.highestRequiredSnapshotVersion = 0;
+ this.snapshotVersions = new TreeSet<>();
+
+ if (capacity < 0) {
+ throw new IllegalArgumentException("Capacity: " + capacity);
+ }
+
+ if (capacity == 0) {
+ threshold = -1;
+ return;
+ }
+
+ if (capacity < MINIMUM_CAPACITY) {
+ capacity = MINIMUM_CAPACITY;
+ } else if (capacity > MAXIMUM_CAPACITY) {
+ capacity = MAXIMUM_CAPACITY;
+ } else {
+ capacity = MathUtils.roundUpToPowerOfTwo(capacity);
+ }
+ primaryTable = makeTable(capacity);
+ }
+
+ // Public API from AbstractStateTable ------------------------------------------------------------------------------
+
+ /**
+ * Returns the total number of entries in this {@link CopyOnWriteStateTable}. This is the sum of both sub-tables.
+ *
+ * @return the number of entries in this {@link CopyOnWriteStateTable}.
+ */
+ @Override
+ public int size() {
+ return primaryTableSize + incrementalRehashTableSize;
+ }
+
+ @Override
+ public S get(K key, N namespace) {
+
+ final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+ final int requiredVersion = highestRequiredSnapshotVersion;
+ final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+ int index = hash & (tab.length - 1);
+
+ for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+ final K eKey = e.key;
+ final N eNamespace = e.namespace;
+ if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
+
+ // copy-on-write check for state
+ if (e.stateVersion < requiredVersion) {
+ // copy-on-write check for entry
+ if (e.entryVersion < requiredVersion) {
+ e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
+ }
+ e.stateVersion = stateTableVersion;
+ e.state = getStateSerializer().copy(e.state);
+ }
+
+ return e.state;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void put(K key, int keyGroup, N namespace, S state) {
+ put(key, namespace, state);
+ }
+
+ @Override
+ public S get(N namespace) {
+ return get(keyContext.getCurrentKey(), namespace);
+ }
+
+ @Override
+ public boolean containsKey(N namespace) {
+ return containsKey(keyContext.getCurrentKey(), namespace);
+ }
+
+ @Override
+ public void put(N namespace, S state) {
+ put(keyContext.getCurrentKey(), namespace, state);
+ }
+
+ @Override
+ public S putAndGetOld(N namespace, S state) {
+ return putAndGetOld(keyContext.getCurrentKey(), namespace, state);
+ }
+
+ @Override
+ public void remove(N namespace) {
+ remove(keyContext.getCurrentKey(), namespace);
+ }
+
+ @Override
+ public S removeAndGetOld(N namespace) {
+ return removeAndGetOld(keyContext.getCurrentKey(), namespace);
+ }
+
+ @Override
+ public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
+ transform(keyContext.getCurrentKey(), namespace, value, transformation);
+ }
+
+ // Private implementation details of the API methods ---------------------------------------------------------------
+
+ /**
+ * Returns whether this table contains the specified key/namespace composite key.
+ *
+ * @param key the key in the composite key to search for. Not null.
+ * @param namespace the namespace in the composite key to search for. Not null.
+ * @return {@code true} if this map contains the specified key/namespace composite key,
+ * {@code false} otherwise.
+ */
+ boolean containsKey(K key, N namespace) {
+
+ final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+ final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+ int index = hash & (tab.length - 1);
+
+ for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+ final K eKey = e.key;
+ final N eNamespace = e.namespace;
+
+ if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Maps the specified key/namespace composite key to the specified value. This method should be preferred
+ * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)} when the caller is not interested
+ * in the old value, because this can potentially reduce copy-on-write activity.
+ *
+ * @param key the key. Not null.
+ * @param namespace the namespace. Not null.
+ * @param value the value. Can be null.
+ */
+ void put(K key, N namespace, S value) {
+ final StateTableEntry<K, N, S> e = putEntry(key, namespace);
+
+ e.state = value;
+ e.stateVersion = stateTableVersion;
+ }
+
+ /**
+ * Maps the specified key/namespace composite key to the specified value. Returns the previous state that was
+ * registered under the composite key.
+ *
+ * @param key the key. Not null.
+ * @param namespace the namespace. Not null.
+ * @param value the value. Can be null.
+ * @return the value of any previous mapping with the specified key or
+ * {@code null} if there was no such mapping.
+ */
+ S putAndGetOld(K key, N namespace, S value) {
+
+ final StateTableEntry<K, N, S> e = putEntry(key, namespace);
+
+ // copy-on-write check for state
+ S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ?
+ getStateSerializer().copy(e.state) :
+ e.state;
+
+ e.state = value;
+ e.stateVersion = stateTableVersion;
+
+ return oldState;
+ }
+
+ /**
+ * Removes the mapping with the specified key/namespace composite key from this map. This method should be preferred
+ * over {@link #removeAndGetOld(Object, Object)} when the caller is not interested in the old value, because this
+ * can potentially reduce copy-on-write activity.
+ *
+ * @param key the key of the mapping to remove. Not null.
+ * @param namespace the namespace of the mapping to remove. Not null.
+ */
+ void remove(K key, N namespace) {
+ removeEntry(key, namespace);
+ }
+
+ /**
+ * Removes the mapping with the specified key/namespace composite key from this map, returning the state that was
+ * found under the entry.
+ *
+ * @param key the key of the mapping to remove. Not null.
+ * @param namespace the namespace of the mapping to remove. Not null.
+ * @return the value of the removed mapping or {@code null} if no mapping
+ * for the specified key was found.
+ */
+ S removeAndGetOld(K key, N namespace) {
+
+ final StateTableEntry<K, N, S> e = removeEntry(key, namespace);
+
+ return e != null ?
+ // copy-on-write check for state
+ (e.stateVersion < highestRequiredSnapshotVersion ?
+ getStateSerializer().copy(e.state) :
+ e.state) :
+ null;
+ }
+
+ /**
+ * @param key the key of the mapping to remove. Not null.
+ * @param namespace the namespace of the mapping to remove. Not null.
+ * @param value the value that is the second input for the transformation.
+ * @param transformation the transformation function to apply on the old state and the given value.
+ * @param <T> type of the value that is the second input to the {@link StateTransformationFunction}.
+ * @throws Exception exception that happen on applying the function.
+ * @see #transform(Object, Object, StateTransformationFunction).
+ */
+ <T> void transform(
+ K key,
+ N namespace,
+ T value,
+ StateTransformationFunction<S, T> transformation) throws Exception {
+
+ final StateTableEntry<K, N, S> entry = putEntry(key, namespace);
+
+ // copy-on-write check for state
+ entry.state = transformation.apply(
+ (entry.stateVersion < highestRequiredSnapshotVersion) ?
+ getStateSerializer().copy(entry.state) :
+ entry.state,
+ value);
+ entry.stateVersion = stateTableVersion;
+ }
+
+ /**
+ * Helper method that is the basis for operations that add mappings.
+ */
+ private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
+
+ final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+ final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+ int index = hash & (tab.length - 1);
+
+ for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
+ if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
+
+ // copy-on-write check for entry
+ if (e.entryVersion < highestRequiredSnapshotVersion) {
+ e = handleChainedEntryCopyOnWrite(tab, index, e);
+ }
+
+ return e;
+ }
+ }
+
+ ++modCount;
+ if (size() > threshold) {
+ doubleCapacity();
+ }
+
+ return addNewStateTableEntry(tab, key, namespace, hash);
+ }
+
+ /**
+ * Helper method that is the basis for operations that remove mappings.
+ */
+ private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
+
+ final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
+ final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
+ int index = hash & (tab.length - 1);
+
+ for (StateTableEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {
+ if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
+ if (prev == null) {
+ tab[index] = e.next;
+ } else {
+ // copy-on-write check for entry
+ if (prev.entryVersion < highestRequiredSnapshotVersion) {
+ prev = handleChainedEntryCopyOnWrite(tab, index, prev);
+ }
+ prev.next = e.next;
+ }
+ ++modCount;
+ if (tab == primaryTable) {
+ --primaryTableSize;
+ } else {
+ --incrementalRehashTableSize;
+ }
+ return e;
+ }
+ }
+ return null;
+ }
+
+ private void checkKeyNamespacePreconditions(K key, N namespace) {
+ Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
+ Preconditions.checkNotNull(namespace, "Provided namespace is null.");
+ }
+
+ // Meta data setter / getter and toString --------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializer<S> getStateSerializer() {
+ return metaInfo.getStateSerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return metaInfo.getNamespaceSerializer();
+ }
+
+ @Override
+ public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+ return metaInfo;
+ }
+
+ @Override
+ public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ this.metaInfo = metaInfo;
+ }
+
+ // Iteration ------------------------------------------------------------------------------------------------------
+
+ @Override
+ public Iterator<StateEntry<K, N, S>> iterator() {
+ return new StateEntryIterator();
+ }
+
+ // Private utility functions for StateTable management -------------------------------------------------------------
+
+ /**
+ * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot)
+ */
+ @VisibleForTesting
+ void releaseSnapshot(int snapshotVersion) {
+ // we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release.
+ // Only stale reads of from the result of #releaseSnapshot calls are ok.
+ synchronized (snapshotVersions) {
+ Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to release unknown snapshot version");
+ highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ? 0 : snapshotVersions.last();
+ }
+ }
+
+ /**
+ * Creates (combined) copy of the table arrays for a snapshot. This method must be called by the same Thread that
+ * does modifications to the {@link CopyOnWriteStateTable}.
+ */
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ StateTableEntry<K, N, S>[] snapshotTableArrays() {
+
+ // we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release.
+ // Only stale reads of from the result of #releaseSnapshot calls are ok. This is why we must call this method
+ // from the same thread that does all the modifications to the table.
+ synchronized (snapshotVersions) {
+
+ // increase the table version for copy-on-write and register the snapshot
+ if (++stateTableVersion < 0) {
+ // this is just a safety net against overflows, but should never happen in practice (i.e., only after 2^31 snapshots)
+ throw new IllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart.");
+ }
+
+ highestRequiredSnapshotVersion = stateTableVersion;
+ snapshotVersions.add(highestRequiredSnapshotVersion);
+ }
+
+ StateTableEntry<K, N, S>[] table = primaryTable;
+ if (isRehashing()) {
+ // consider both tables for the snapshot, the rehash index tells us which part of the two tables we need
+ final int localRehashIndex = rehashIndex;
+ final int localCopyLength = table.length - localRehashIndex;
+ StateTableEntry<K, N, S>[] copy = new StateTableEntry[localRehashIndex + table.length];
+ // for the primary table, take every index >= rhIdx.
+ System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
+
+ // for the new table, we are sure that two regions contain all the entries:
+ // [0, rhIdx[ AND [table.length / 2, table.length / 2 + rhIdx[
+ table = incrementalRehashTable;
+ System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
+ System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
+
+ return copy;
+ } else {
+ // we only need to copy the primary table
+ return Arrays.copyOf(table, table.length);
+ }
+ }
+
+ /**
+ * Allocate a table of the given capacity and set the threshold accordingly.
+ *
+ * @param newCapacity must be a power of two
+ */
+ private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
+
+ if (MAXIMUM_CAPACITY == newCapacity) {
+ LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can lead " +
+ "to more collisions and lower performance. Please consider scaling-out your job or using a " +
+ "different keyed state backend implementation!");
+ }
+
+ threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity
+ @SuppressWarnings("unchecked") StateTableEntry<K, N, S>[] newTable
+ = (StateTableEntry<K, N, S>[]) new StateTableEntry[newCapacity];
+ return newTable;
+ }
+
+ /**
+ * Creates and inserts a new {@link StateTableEntry}.
+ */
+ private StateTableEntry<K, N, S> addNewStateTableEntry(
+ StateTableEntry<K, N, S>[] table,
+ K key,
+ N namespace,
+ int hash) {
+
+ // small optimization that aims to avoid holding references on duplicate namespace objects
+ if (namespace.equals(lastNamespace)) {
+ namespace = lastNamespace;
+ } else {
+ lastNamespace = namespace;
+ }
+
+ int index = hash & (table.length - 1);
+ StateTableEntry<K, N, S> newEntry = new StateTableEntry<>(
+ key,
+ namespace,
+ null,
+ hash,
+ table[index],
+ stateTableVersion,
+ stateTableVersion);
+ table[index] = newEntry;
+
+ if (table == primaryTable) {
+ ++primaryTableSize;
+ } else {
+ ++incrementalRehashTableSize;
+ }
+ return newEntry;
+ }
+
+ /**
+ * Select the sub-table which is responsible for entries with the given hash code.
+ *
+ * @param hashCode the hash code which we use to decide about the table that is responsible.
+ * @return the index of the sub-table that is responsible for the entry with the given hash code.
+ */
+ private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) {
+ return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? primaryTable : incrementalRehashTable;
+ }
+
+ /**
+ * Doubles the capacity of the hash table. Existing entries are placed in
+ * the correct bucket on the enlarged table. If the current capacity is,
+ * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which
+ * will be new unless we were already at MAXIMUM_CAPACITY.
+ */
+ private void doubleCapacity() {
+
+ // There can only be one rehash in flight. From the amount of incremental rehash steps we take, this should always hold.
+ Preconditions.checkState(!isRehashing(), "There is already a rehash in progress.");
+
+ StateTableEntry<K, N, S>[] oldTable = primaryTable;
+
+ int oldCapacity = oldTable.length;
+
+ if (oldCapacity == MAXIMUM_CAPACITY) {
+ return;
+ }
+
+ incrementalRehashTable = makeTable(oldCapacity * 2);
+ }
+
+ /**
+ * Returns true, if an incremental rehash is in progress.
+ */
+ @VisibleForTesting
+ boolean isRehashing() {
+ // if we rehash, the secondary table is not empty
+ return EMPTY_TABLE != incrementalRehashTable;
+ }
+
+ /**
+ * Computes the hash for the composite of key and namespace and performs some steps of incremental rehash if
+ * incremental rehashing is in progress.
+ */
+ private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {
+
+ checkKeyNamespacePreconditions(key, namespace);
+
+ if (isRehashing()) {
+ incrementalRehash();
+ }
+
+ return compositeHash(key, namespace);
+ }
+
+ /**
+ * Runs a number of steps for incremental rehashing.
+ */
+ @SuppressWarnings("unchecked")
+ private void incrementalRehash() {
+
+ StateTableEntry<K, N, S>[] oldTable = primaryTable;
+ StateTableEntry<K, N, S>[] newTable = incrementalRehashTable;
+
+ int oldCapacity = oldTable.length;
+ int newMask = newTable.length - 1;
+ int requiredVersion = highestRequiredSnapshotVersion;
+ int rhIdx = rehashIndex;
+ int transferred = 0;
+
+ // we migrate a certain minimum amount of entries from the old to the new table
+ while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
+
+ StateTableEntry<K, N, S> e = oldTable[rhIdx];
+
+ while (e != null) {
+ // copy-on-write check for entry
+ if (e.entryVersion < requiredVersion) {
+ e = new StateTableEntry<>(e, stateTableVersion);
+ }
+ StateTableEntry<K, N, S> n = e.next;
+ int pos = e.hash & newMask;
+ e.next = newTable[pos];
+ newTable[pos] = e;
+ e = n;
+ ++transferred;
+ }
+
+ oldTable[rhIdx] = null;
+ if (++rhIdx == oldCapacity) {
+ //here, the rehash is complete and we release resources and reset fields
+ primaryTable = newTable;
+ incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
+ primaryTableSize += incrementalRehashTableSize;
+ incrementalRehashTableSize = 0;
+ rehashIndex = 0;
+ return;
+ }
+ }
+
+ // sync our local bookkeeping the with official bookkeeping fields
+ primaryTableSize -= transferred;
+ incrementalRehashTableSize += transferred;
+ rehashIndex = rhIdx;
+ }
+
+ /**
+ * Perform copy-on-write for entry chains. We iterate the (hopefully and probably) still cached chain, replace
+ * all links up to the 'untilEntry', which we actually wanted to modify.
+ */
+ private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(
+ StateTableEntry<K, N, S>[] tab,
+ int tableIdx,
+ StateTableEntry<K, N, S> untilEntry) {
+
+ final int required = highestRequiredSnapshotVersion;
+
+ StateTableEntry<K, N, S> current = tab[tableIdx];
+ StateTableEntry<K, N, S> copy;
+
+ if (current.entryVersion < required) {
+ copy = new StateTableEntry<>(current, stateTableVersion);
+ tab[tableIdx] = copy;
+ } else {
+ // nothing to do, just advance copy to current
+ copy = current;
+ }
+
+ // we iterate the chain up to 'until entry'
+ while (current != untilEntry) {
+
+ //advance current
+ current = current.next;
+
+ if (current.entryVersion < required) {
+ // copy and advance the current's copy
+ copy.next = new StateTableEntry<>(current, stateTableVersion);
+ copy = copy.next;
+ } else {
+ // nothing to do, just advance copy to current
+ copy = current;
+ }
+ }
+
+ return copy;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
+ return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY;
+ }
+
+ /**
+ * Helper function that creates and scrambles a composite hash for key and namespace.
+ */
+ private static int compositeHash(Object key, Object namespace) {
+ // create composite key through XOR, then apply some bit-mixing for better distribution of skewed keys.
+ return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode());
+ }
+
+ // Snapshotting ----------------------------------------------------------------------------------------------------
+
+ int getStateTableVersion() {
+ return stateTableVersion;
+ }
+
+ /**
+ * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be written in checkpointing. The snapshot integrity
+ * is protected through copy-on-write from the {@link CopyOnWriteStateTable}. Users should call
+ * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using the returned object.
+ *
+ * @return a snapshot from this {@link CopyOnWriteStateTable}, for checkpointing.
+ */
+ @Override
+ public CopyOnWriteStateTableSnapshot<K, N, S> createSnapshot() {
+ return new CopyOnWriteStateTableSnapshot<>(this);
+ }
+
+ /**
+ * Releases a snapshot for this {@link CopyOnWriteStateTable}. This method should be called once a snapshot is no more needed,
+ * so that the {@link CopyOnWriteStateTable} can stop considering this snapshot for copy-on-write, thus avoiding unnecessary
+ * object creation.
+ *
+ * @param snapshotToRelease the snapshot to release, which was previously created by this state table.
+ */
+ void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> snapshotToRelease) {
+
+ Preconditions.checkArgument(snapshotToRelease.isOwner(this),
+ "Cannot release snapshot which is owned by a different state table.");
+
+ releaseSnapshot(snapshotToRelease.getSnapshotVersion());
+ }
+
+ // StateTableEntry -------------------------------------------------------------------------------------------------
+
+ /**
+ * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of key, namespace, and state. Thereby, key and
+ * namespace together serve as a composite key for the state. This class also contains some management meta data for
+ * copy-on-write, a pointer to link other {@link StateTableEntry}s to a list, and cached hash code.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+ static class StateTableEntry<K, N, S> implements StateEntry<K, N, S> {
+
+ /**
+ * The key. Assumed to be immutable and not null.
+ */
+ final K key;
+
+ /**
+ * The namespace. Assumed to be immutable and not null.
+ */
+ final N namespace;
+
+ /**
+ * The state. This is not final to allow exchanging the object for copy-on-write. Can be null.
+ */
+ S state;
+
+ /**
+ * Link to another {@link StateTableEntry}. This is used to resolve collisions in the
+ * {@link CopyOnWriteStateTable} through chaining.
+ */
+ StateTableEntry<K, N, S> next;
+
+ /**
+ * The version of this {@link StateTableEntry}. This is meta data for copy-on-write of the table structure.
+ */
+ int entryVersion;
+
+ /**
+ * The version of the state object in this entry. This is meta data for copy-on-write of the state object itself.
+ */
+ int stateVersion;
+
+ /**
+ * The computed secondary hash for the composite of key and namespace.
+ */
+ final int hash;
+
+ StateTableEntry() {
+ this(null, null, null, 0, null, 0, 0);
+ }
+
+ StateTableEntry(StateTableEntry<K, N, S> other, int entryVersion) {
+ this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion);
+ }
+
+ StateTableEntry(
+ K key,
+ N namespace,
+ S state,
+ int hash,
+ StateTableEntry<K, N, S> next,
+ int entryVersion,
+ int stateVersion) {
+ this.key = key;
+ this.namespace = namespace;
+ this.hash = hash;
+ this.next = next;
+ this.entryVersion = entryVersion;
+ this.state = state;
+ this.stateVersion = stateVersion;
+ }
+
+ public final void setState(S value, int mapVersion) {
+ // naturally, we can update the state version every time we replace the old state with a different object
+ if (value != state) {
+ this.state = value;
+ this.stateVersion = mapVersion;
+ }
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public N getNamespace() {
+ return namespace;
+ }
+
+ @Override
+ public S getState() {
+ return state;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (!(o instanceof CopyOnWriteStateTable.StateTableEntry)) {
+ return false;
+ }
+
+ StateEntry<?, ?, ?> e = (StateEntry<?, ?, ?>) o;
+ return e.getKey().equals(key)
+ && e.getNamespace().equals(namespace)
+ && Objects.equals(e.getState(), state);
+ }
+
+ @Override
+ public final int hashCode() {
+ return (key.hashCode() ^ namespace.hashCode()) ^ Objects.hashCode(state);
+ }
+
+ @Override
+ public final String toString() {
+ return "(" + key + "|" + namespace + ")=" + state;
+ }
+ }
+
+ // For testing ----------------------------------------------------------------------------------------------------
+
+ @Override
+ public int sizeOfNamespace(Object namespace) {
+ int count = 0;
+ for (StateEntry<K, N, S> entry : this) {
+ if (null != entry && namespace.equals(entry.getNamespace())) {
+ ++count;
+ }
+ }
+ return count;
+ }
+
+
+ // StateEntryIterator ---------------------------------------------------------------------------------------------
+
+ /**
+ * Iterator over the entries in a {@link CopyOnWriteStateTable}.
+ */
+ class StateEntryIterator implements Iterator<StateEntry<K, N, S>> {
+ private StateTableEntry<K, N, S>[] activeTable;
+ private int nextTablePosition;
+ private StateTableEntry<K, N, S> nextEntry;
+ private int expectedModCount = modCount;
+
+ StateEntryIterator() {
+ this.activeTable = primaryTable;
+ this.nextTablePosition = 0;
+ this.expectedModCount = modCount;
+ this.nextEntry = getBootstrapEntry();
+ advanceIterator();
+ }
+
+ private StateTableEntry<K, N, S> advanceIterator() {
+
+ StateTableEntry<K, N, S> entryToReturn = nextEntry;
+ StateTableEntry<K, N, S> next = entryToReturn.next;
+
+ // consider both sub-tables tables to cover the case of rehash
+ while (next == null) {
+
+ StateTableEntry<K, N, S>[] tab = activeTable;
+
+ while (nextTablePosition < tab.length) {
+ next = tab[nextTablePosition++];
+
+ if (next != null) {
+ nextEntry = next;
+ return entryToReturn;
+ }
+ }
+
+ if (activeTable == incrementalRehashTable) {
+ break;
+ }
+
+ activeTable = incrementalRehashTable;
+ nextTablePosition = 0;
+ }
+
+ nextEntry = next;
+ return entryToReturn;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextEntry != null;
+ }
+
+ @Override
+ public StateTableEntry<K, N, S> next() {
+ if (modCount != expectedModCount) {
+ throw new ConcurrentModificationException();
+ }
+
+ if (nextEntry == null) {
+ throw new NoSuchElementException();
+ }
+
+ return advanceIterator();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
new file mode 100644
index 0000000..c83fce0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import java.io.IOException;
+
+/**
+ * This class represents the snapshot of a {@link CopyOnWriteStateTable} and has a role in operator state checkpointing. Besides
+ * holding the {@link CopyOnWriteStateTable}s internal entries at the time of the snapshot, this class is also responsible for
+ * preparing and writing the state in the process of checkpointing.
+ * <p>
+ * IMPORTANT: Please notice that snapshot integrity of entries in this class rely on proper copy-on-write semantics
+ * through the {@link CopyOnWriteStateTable} that created the snapshot object, but all objects in this snapshot must be considered
+ * as READ-ONLY!. The reason is that the objects held by this class may or may not be deep copies of original objects
+ * that may still used in the {@link CopyOnWriteStateTable}. This depends for each entry on whether or not it was subject to
+ * copy-on-write operations by the {@link CopyOnWriteStateTable}. Phrased differently: the {@link CopyOnWriteStateTable} provides
+ * copy-on-write isolation for this snapshot, but this snapshot does not isolate modifications from the
+ * {@link CopyOnWriteStateTable}!
+ *
+ * @param <K> type of key
+ * @param <N> type of namespace
+ * @param <S> type of state
+ */
+@Internal
+public class CopyOnWriteStateTableSnapshot<K, N, S>
+ extends AbstractStateTableSnapshot<K, N, S, CopyOnWriteStateTable<K, N, S>> {
+
+ /**
+ * Version of the {@link CopyOnWriteStateTable} when this snapshot was created. This can be used to release the snapshot.
+ */
+ private final int snapshotVersion;
+
+ /**
+ * The number of entries in the {@link CopyOnWriteStateTable} at the time of creating this snapshot.
+ */
+ private final int stateTableSize;
+
+ /**
+ * The state table entries, as by the time this snapshot was created. Objects in this array may or may not be deep
+ * copies of the current entries in the {@link CopyOnWriteStateTable} that created this snapshot. This depends for each entry
+ * on whether or not it was subject to copy-on-write operations by the {@link CopyOnWriteStateTable}.
+ */
+ private final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshotData;
+
+ /**
+ * Offsets for the individual key-groups. This is lazily created when the snapshot is grouped by key-group during
+ * the process of writing this snapshot to an output as part of checkpointing.
+ */
+ private int[] keyGroupOffsets;
+
+ /**
+ * Creates a new {@link CopyOnWriteStateTableSnapshot}.
+ *
+ * @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents a snapshot.
+ */
+ CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {
+
+ super(owningStateTable);
+ this.snapshotData = owningStateTable.snapshotTableArrays();
+ this.snapshotVersion = owningStateTable.getStateTableVersion();
+ this.stateTableSize = owningStateTable.size();
+ this.keyGroupOffsets = null;
+ }
+
+ /**
+ * Returns the internal version of the {@link CopyOnWriteStateTable} when this snapshot was created. This value must be used to
+ * tell the {@link CopyOnWriteStateTable} when to release this snapshot.
+ */
+ int getSnapshotVersion() {
+ return snapshotVersion;
+ }
+
+ /**
+ * Partitions the snapshot data by key-group. The algorithm first builds a histogram for the distribution of keys
+ * into key-groups. Then, the histogram is accumulated to obtain the boundaries of each key-group in an array.
+ * Last, we use the accumulated counts as write position pointers for the key-group's bins when reordering the
+ * entries by key-group. This operation is lazily performed before the first writing of a key-group.
+ * <p>
+ * As a possible future optimization, we could perform the repartitioning in-place, using a scheme similar to the
+ * cuckoo cycles in cuckoo hashing. This can trade some performance for a smaller memory footprint.
+ */
+ @SuppressWarnings("unchecked")
+ private void partitionEntriesByKeyGroup() {
+
+ // We only have to perform this step once before the first key-group is written
+ if (null != keyGroupOffsets) {
+ return;
+ }
+
+ final KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange();
+ final int totalKeyGroups = owningStateTable.keyContext.getNumberOfKeyGroups();
+ final int baseKgIdx = keyGroupRange.getStartKeyGroup();
+ final int[] histogram = new int[keyGroupRange.getNumberOfKeyGroups() + 1];
+
+ CopyOnWriteStateTable.StateTableEntry<K, N, S>[] unfold = new CopyOnWriteStateTable.StateTableEntry[stateTableSize];
+
+ // 1) In this step we i) 'unfold' the linked list of entries to a flat array and ii) build a histogram for key-groups
+ int unfoldIndex = 0;
+ for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : snapshotData) {
+ while (null != entry) {
+ int effectiveKgIdx =
+ KeyGroupRangeAssignment.computeKeyGroupForKeyHash(entry.key.hashCode(), totalKeyGroups) - baseKgIdx + 1;
+ ++histogram[effectiveKgIdx];
+ unfold[unfoldIndex++] = entry;
+ entry = entry.next;
+ }
+ }
+
+ // 2) We accumulate the histogram bins to obtain key-group ranges in the final array
+ for (int i = 1; i < histogram.length; ++i) {
+ histogram[i] += histogram[i - 1];
+ }
+
+ // 3) We repartition the entries by key-group, using the histogram values as write indexes
+ for (CopyOnWriteStateTable.StateTableEntry<K, N, S> t : unfold) {
+ int effectiveKgIdx =
+ KeyGroupRangeAssignment.computeKeyGroupForKeyHash(t.key.hashCode(), totalKeyGroups) - baseKgIdx;
+ snapshotData[histogram[effectiveKgIdx]++] = t;
+ }
+
+ // 4) As byproduct, we also created the key-group offsets
+ this.keyGroupOffsets = histogram;
+ }
+
+ @Override
+ public void release() {
+ owningStateTable.releaseSnapshot(this);
+ }
+
+ @Override
+ public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
+
+ if (null == keyGroupOffsets) {
+ partitionEntriesByKeyGroup();
+ }
+
+ final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] groupedOut = snapshotData;
+ KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange();
+ int keyGroupOffsetIdx = keyGroupId - keyGroupRange.getStartKeyGroup() - 1;
+ int startOffset = keyGroupOffsetIdx < 0 ? 0 : keyGroupOffsets[keyGroupOffsetIdx];
+ int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1];
+
+ TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();
+ TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer();
+ TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer();
+
+ // write number of mappings in key-group
+ dov.writeInt(endOffset - startOffset);
+
+ // write mappings
+ for (int i = startOffset; i < endOffset; ++i) {
+ CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite = groupedOut[i];
+ groupedOut[i] = null; // free asap for GC
+ namespaceSerializer.serialize(toWrite.namespace, dov);
+ keySerializer.serialize(toWrite.key, dov);
+ stateSerializer.serialize(toWrite.state, dov);
+ }
+ }
+
+ /**
+ * Returns true iff the given state table is the owner of this snapshot object.
+ */
+ boolean isOwner(CopyOnWriteStateTable<K, N, S> stateTable) {
+ return stateTable == owningStateTable;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 624b83e..64fc1db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -23,18 +23,16 @@ import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkState;
/**
* Heap-backed partitioned {@link ReducingState} that is
* snapshotted into files.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <IN> The type of the value added to the state.
@@ -45,13 +43,11 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
implements InternalAggregatingState<N, IN, OUT> {
- private final AggregateFunction<IN, ACC, OUT> aggFunction;
+ private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend
- * The state backend backing that created this state.
* @param stateDesc
* The state identifier for the state. This contains name and can create a default state value.
* @param stateTable
@@ -60,14 +56,13 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
* The serializer for the type that indicates the namespace
*/
public HeapAggregatingState(
- KeyedStateBackend<K> backend,
AggregatingStateDescriptor<IN, ACC, OUT> stateDesc,
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
- this.aggFunction = stateDesc.getAggregateFunction();
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction());
}
// ------------------------------------------------------------------------
@@ -76,64 +71,25 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
@Override
public OUT get() {
- final K key = backend.getCurrentKey();
-
- checkState(currentNamespace != null, "No namespace set.");
- checkState(key != null, "No key set.");
-
- Map<N, Map<K, ACC>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return null;
- }
- Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- ACC accumulator = keyedMap.get(key);
- return aggFunction.getResult(accumulator);
+ ACC accumulator = stateTable.get(currentNamespace);
+ return accumulator != null ? aggregateTransformation.aggFunction.getResult(accumulator) : null;
}
@Override
public void add(IN value) throws IOException {
- final K key = backend.getCurrentKey();
-
- checkState(currentNamespace != null, "No namespace set.");
- checkState(key != null, "No key set.");
+ final N namespace = currentNamespace;
if (value == null) {
clear();
return;
}
- Map<N, Map<K, ACC>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
- }
-
- Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
-
- // if this is the first value for the key, create a new accumulator
- ACC accumulator = keyedMap.get(key);
- if (accumulator == null) {
- accumulator = aggFunction.createAccumulator();
- keyedMap.put(key, accumulator);
+ try {
+ stateTable.transform(namespace, value, aggregateTransformation);
+ } catch (Exception e) {
+ throw new IOException("Exception while applying AggregateFunction in aggregating state", e);
}
-
- //
- aggFunction.add(value, accumulator);
}
// ------------------------------------------------------------------------
@@ -142,6 +98,24 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
@Override
protected ACC mergeState(ACC a, ACC b) throws Exception {
- return aggFunction.merge(a, b);
+ return aggregateTransformation.aggFunction.merge(a, b);
+ }
+
+ static final class AggregateTransformation<IN, ACC, OUT> implements StateTransformationFunction<ACC, IN> {
+
+ private final AggregateFunction<IN, ACC, OUT> aggFunction;
+
+ AggregateTransformation(AggregateFunction<IN, ACC, OUT> aggFunction) {
+ this.aggFunction = Preconditions.checkNotNull(aggFunction);
+ }
+
+ @Override
+ public ACC apply(ACC accumulator, IN value) throws Exception {
+ if (accumulator == null) {
+ accumulator = aggFunction.createAccumulator();
+ }
+ aggFunction.add(value, accumulator);
+ return accumulator;
+ }
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index 6df3f5d..dad6d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -22,12 +22,11 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
-import java.util.Map;
/**
* Heap-backed partitioned {@link FoldingState} that is
@@ -43,24 +42,22 @@ public class HeapFoldingState<K, N, T, ACC>
implements InternalFoldingState<N, T, ACC> {
/** The function used to fold the state */
- private final FoldFunction<T, ACC> foldFunction;
+ private final FoldTransformation<T, ACC> foldTransformation;
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
public HeapFoldingState(
- KeyedStateBackend<K> backend,
FoldingStateDescriptor<T, ACC> stateDesc,
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
- this.foldFunction = stateDesc.getFoldFunction();
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.foldTransformation = new FoldTransformation<>(stateDesc);
}
// ------------------------------------------------------------------------
@@ -69,62 +66,37 @@ public class HeapFoldingState<K, N, T, ACC>
@Override
public ACC get() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, ACC>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- return keyedMap.get(backend.<K>getCurrentKey());
+ return stateTable.get(currentNamespace);
}
@Override
public void add(T value) throws IOException {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
if (value == null) {
clear();
return;
}
- Map<N, Map<K, ACC>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
+ try {
+ stateTable.transform(currentNamespace, value, foldTransformation);
+ } catch (Exception e) {
+ throw new IOException("Could not add value to folding state.", e);
}
+ }
- Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
+ static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
- ACC currentValue = keyedMap.get(backend.<K>getCurrentKey());
+ private final FoldingStateDescriptor<T, ACC> stateDescriptor;
+ private final FoldFunction<T, ACC> foldFunction;
- try {
+ FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
+ this.stateDescriptor = Preconditions.checkNotNull(stateDesc);
+ this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction());
+ }
- if (currentValue == null) {
- keyedMap.put(backend.<K>getCurrentKey(),
- foldFunction.fold(stateDesc.getDefaultValue(), value));
- } else {
- keyedMap.put(backend.<K>getCurrentKey(), foldFunction.fold(currentValue, value));
- }
- } catch (Exception e) {
- throw new RuntimeException("Could not add value to folding state.", e);
+ @Override
+ public ACC apply(ACC previousState, T value) throws Exception {
+ return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a4a08c1..0335933 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
@@ -29,18 +30,15 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
-import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -54,8 +52,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -74,6 +70,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to
@@ -94,7 +91,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* but we can't put them here because different key/value states with different types and
* namespace types share this central list of tables.
*/
- private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+ private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+
+ /**
+ * Determines whether or not we run snapshots asynchronously. This impacts the choice of the underlying
+ * {@link StateTable} implementation.
+ */
+ private final boolean asynchronousSnapshots;
public HeapKeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
@@ -102,10 +105,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ClassLoader userCodeClassLoader,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
+ boolean asynchronousSnapshots,
ExecutionConfig executionConfig) {
super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
-
+ this.asynchronousSnapshots = asynchronousSnapshots;
LOG.info("Initializing heap keyed state backend with stream factory.");
}
@@ -124,7 +128,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
String stateName,
StateDescriptor.Type stateType,
- TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<N> namespaceSerializer,
TypeSerializer<V> valueSerializer) {
final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
@@ -134,7 +138,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
if (stateTable == null) {
- stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
+ stateTable = newStateTable(newMetaInfo);
stateTables.put(stateName, stateTable);
} else {
if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
@@ -152,7 +156,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ValueStateDescriptor<V> stateDesc) throws Exception {
StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@@ -170,7 +174,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
namespaceSerializer,
new ArrayListSerializer<T>(stateDesc.getElementSerializer()));
- return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@@ -179,7 +183,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ReducingStateDescriptor<T> stateDesc) throws Exception {
StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapReducingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapReducingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@@ -188,7 +192,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapAggregatingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapAggregatingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@@ -197,83 +201,151 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapFoldingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapFoldingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
public <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
+
StateTable<K, N, HashMap<UK, UV>> stateTable = tryRegisterStateTable(
stateDesc.getName(),
stateDesc.getType(),
namespaceSerializer,
new HashMapSerializer<>(stateDesc.getKeySerializer(), stateDesc.getValueSerializer()));
-
- return new HeapMapState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+
+ return new HeapMapState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@Override
@SuppressWarnings("unchecked")
- public RunnableFuture<KeyGroupsStateHandle> snapshot(
- long checkpointId,
- long timestamp,
- CheckpointStreamFactory streamFactory,
+ public RunnableFuture<KeyGroupsStateHandle> snapshot(
+ final long checkpointId,
+ final long timestamp,
+ final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
if (stateTables.isEmpty()) {
return new DoneFuture<>(null);
}
- try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.
- createCheckpointStateOutputStream(checkpointId, timestamp)) {
-
- DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+ long syncStartTime = System.currentTimeMillis();
- Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
- "Too many KV-States: " + stateTables.size() +
- ". Currently at most " + Short.MAX_VALUE + " states are supported");
+ Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
+ "Too many KV-States: " + stateTables.size() +
+ ". Currently at most " + Short.MAX_VALUE + " states are supported");
- List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
- Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
+ final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
- for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size());
- RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
- KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
- metaInfo.getStateType(),
- metaInfo.getName(),
- metaInfo.getNamespaceSerializer(),
- metaInfo.getStateSerializer());
+ for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
+ KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+ metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
- metaInfoProxyList.add(metaInfoProxy);
- kVStateToId.put(kvState.getKey(), kVStateToId.size());
+ metaInfoProxyList.add(metaInfoProxy);
+ kVStateToId.put(kvState.getKey(), kVStateToId.size());
+ StateTable<K, ?, ?> stateTable = kvState.getValue();
+ if (null != stateTable) {
+ cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot());
}
+ }
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+ final KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+ //--------------------------------------------------- this becomes the end of sync part
+
+ // implementation of the async IO operation, based on FutureTask
+ final AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+ new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+ AtomicBoolean open = new AtomicBoolean(false);
+
+ @Override
+ public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+ if (open.compareAndSet(false, true)) {
+ CheckpointStreamFactory.CheckpointStateOutputStream stream =
+ streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ try {
+ cancelStreamRegistry.registerClosable(stream);
+ return stream;
+ } catch (Exception ex) {
+ open.set(false);
+ throw ex;
+ }
+ } else {
+ throw new IOException("Operation already opened.");
+ }
+ }
- serializationProxy.write(outView);
+ @Override
+ public KeyGroupsStateHandle performOperation() throws Exception {
+ long asyncStartTime = System.currentTimeMillis();
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+ DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+ serializationProxy.write(outView);
- int offsetCounter = 0;
- long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
+ long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
- for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
- keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
- outView.writeInt(keyGroupIndex);
- for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
- outView.writeShort(kVStateToId.get(kvState.getKey()));
- writeStateTableForKeyGroup(outView, kvState.getValue(), keyGroupIndex);
- }
- }
+ for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+ int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
+ keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+ outView.writeInt(keyGroupId);
+
+ for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+ outView.writeShort(kVStateToId.get(kvState.getKey()));
+ cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId);
+ }
+ }
+
+ if (open.compareAndSet(true, false)) {
+ StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+ KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+ final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+
+ if (asynchronousSnapshots) {
+ LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
+ streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+ }
+
+ return keyGroupsStateHandle;
+ } else {
+ throw new IOException("Checkpoint stream already closed.");
+ }
+ }
+
+ @Override
+ public void done(boolean canceled) {
+ if (open.compareAndSet(true, false)) {
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+ if (null != stream) {
+ cancelStreamRegistry.unregisterClosable(stream);
+ IOUtils.closeQuietly(stream);
+ }
+ }
+ for (StateTableSnapshot snapshot : cowStateStableSnapshots.values()) {
+ snapshot.release();
+ }
+ }
+ };
- StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+ AsyncStoppableTaskWithCallback<KeyGroupsStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
- KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
- final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
- return new DoneFuture<>(keyGroupsStateHandle);
+ if (!asynchronousSnapshots) {
+ task.run();
}
+
+ LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " +
+ Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
+
+ return task;
}
@SuppressWarnings("deprecation")
@@ -292,42 +364,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private <N, S> void writeStateTableForKeyGroup(
- DataOutputView outView,
- StateTable<K, N, S> stateTable,
- int keyGroupIndex) throws IOException {
-
- TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
- TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
-
- Map<N, Map<K, S>> namespaceMap = stateTable.get(keyGroupIndex);
- if (namespaceMap == null) {
- outView.writeByte(0);
- } else {
- outView.writeByte(1);
-
- // number of namespaces
- outView.writeInt(namespaceMap.size());
- for (Map.Entry<N, Map<K, S>> namespace : namespaceMap.entrySet()) {
- namespaceSerializer.serialize(namespace.getKey(), outView);
-
- Map<K, S> entryMap = namespace.getValue();
-
- // number of entries
- outView.writeInt(entryMap.size());
- for (Map.Entry<K, S> entry : entryMap.entrySet()) {
- keySerializer.serialize(entry.getKey(), outView);
- stateSerializer.serialize(entry.getValue(), outView);
- }
- }
- }
- }
-
@SuppressWarnings({"unchecked"})
private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) throws Exception {
+ final Map<Integer, String> kvStatesById = new HashMap<>();
int numRegisteredKvStates = 0;
- Map<Integer, String> kvStatesById = new HashMap<>();
+ stateTables.clear();
for (KeyGroupsStateHandle keyGroupsHandle : state) {
@@ -359,7 +401,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
- stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
+ stateTable = newStateTable(registeredBackendStateMetaInfo);
stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
++numRegisteredKvStates;
@@ -372,20 +414,20 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
fsDataInputStream.seek(offset);
int writtenKeyGroupIndex = inView.readInt();
- assert writtenKeyGroupIndex == keyGroupIndex;
+
+ Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+ "Unexpected key-group in restore.");
for (int i = 0; i < metaInfoList.size(); i++) {
int kvStateId = inView.readShort();
-
- byte isPresent = inView.readByte();
- if (isPresent == 0) {
- continue;
- }
-
StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
- Preconditions.checkNotNull(stateTable);
- readStateTableForKeyGroup(inView, stateTable, keyGroupIndex);
+ StateTableByKeyGroupReader keyGroupReader =
+ StateTableByKeyGroupReaders.readerForVersion(
+ stateTable,
+ serializationProxy.getRestoredVersion());
+
+ keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
}
}
} finally {
@@ -395,38 +437,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private <N, S> void readStateTableForKeyGroup(
- DataInputView inView,
- StateTable<K, N, S> stateTable,
- int keyGroupIndex) throws IOException {
-
- TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
- TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
-
- Map<N, Map<K, S>> namespaceMap = new HashMap<>();
- stateTable.set(keyGroupIndex, namespaceMap);
-
- int numNamespaces = inView.readInt();
- for (int k = 0; k < numNamespaces; k++) {
- N namespace = namespaceSerializer.deserialize(inView);
- Map<K, S> entryMap = new HashMap<>();
- namespaceMap.put(namespace, entryMap);
-
- int numEntries = inView.readInt();
- for (int l = 0; l < numEntries; l++) {
- K key = keySerializer.deserialize(inView);
- S state = stateSerializer.deserialize(inView);
- entryMap.put(key, state);
- }
- }
- }
-
@Override
public String toString() {
return "HeapKeyedStateBackend";
}
- @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
+ @SuppressWarnings({"unchecked", "rawtypes", "DeprecatedIsStillUsed"})
@Deprecated
private void restoreOldSavepointKeyedState(
Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException {
@@ -444,118 +460,18 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) {
- KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue();
-
- final RestoredState restoredState;
-
- if (genericSnapshot instanceof AbstractMemStateSnapshot) {
-
- AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
- (AbstractMemStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
+ final String stateName = nameToState.getKey();
+ final KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue();
- restoredState = restoreHeapState(stateSnapshot);
-
- } else if (genericSnapshot instanceof AbstractFsStateSnapshot) {
-
- AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
- (AbstractFsStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
- restoredState = restoreFsState(stateSnapshot);
+ if (genericSnapshot instanceof MigrationRestoreSnapshot) {
+ MigrationRestoreSnapshot<K, ?, ?> stateSnapshot = (MigrationRestoreSnapshot<K, ?, ?>) genericSnapshot;
+ final StateTable rawResultMap =
+ stateSnapshot.deserialize(stateName, this);
+ // add named state to the backend
+ stateTables.put(stateName, rawResultMap);
} else {
throw new IllegalStateException("Unknown state: " + genericSnapshot);
}
-
- Map rawResultMap = restoredState.getRawResultMap();
- TypeSerializer<?> namespaceSerializer = restoredState.getNamespaceSerializer();
- TypeSerializer<?> stateSerializer = restoredState.getStateSerializer();
-
- if (namespaceSerializer instanceof VoidSerializer) {
- namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- }
-
- Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
-
- if (null != nullNameSpaceFix) {
- rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
- }
-
- RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
- new RegisteredBackendStateMetaInfo<>(
- StateDescriptor.Type.UNKNOWN,
- nameToState.getKey(),
- namespaceSerializer,
- stateSerializer);
-
- StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
- stateTable.getState()[0] = rawResultMap;
-
- // add named state to the backend
- stateTables.put(registeredBackendStateMetaInfo.getName(), stateTable);
- }
- }
-
- @SuppressWarnings("deprecation")
- private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
- return new RestoredState(
- stateSnapshot.deserialize(),
- stateSnapshot.getNamespaceSerializer(),
- stateSnapshot.getStateSerializer());
- }
-
- @SuppressWarnings({"rawtypes", "unchecked", "deprecation"})
- private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
- FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
- //TODO register closeable to support fast cancelation?
- try (FSDataInputStream inStream = fs.open(stateSnapshot.getFilePath())) {
-
- DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
-
- final int numNamespaces = inView.readInt();
- HashMap rawResultMap = new HashMap<>(numNamespaces);
-
- TypeSerializer<K> keySerializer = stateSnapshot.getKeySerializer();
- TypeSerializer<?> namespaceSerializer = stateSnapshot.getNamespaceSerializer();
- TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
-
- for (int i = 0; i < numNamespaces; i++) {
- Object namespace = namespaceSerializer.deserialize(inView);
- final int numKV = inView.readInt();
- Map<K, Object> namespaceMap = new HashMap<>(numKV);
- rawResultMap.put(namespace, namespaceMap);
- for (int j = 0; j < numKV; j++) {
- K key = keySerializer.deserialize(inView);
- Object value = stateSerializer.deserialize(inView);
- namespaceMap.put(key, value);
- }
- }
- return new RestoredState(rawResultMap, namespaceSerializer, stateSerializer);
- } catch (Exception e) {
- throw new IOException("Failed to restore state from file system", e);
- }
- }
-
- @SuppressWarnings("rawtypes")
- static final class RestoredState {
-
- private final Map rawResultMap;
- private final TypeSerializer<?> namespaceSerializer;
- private final TypeSerializer<?> stateSerializer ;
-
- public RestoredState(Map rawResultMap, TypeSerializer<?> namespaceSerializer, TypeSerializer<?> stateSerializer) {
- this.rawResultMap = rawResultMap;
- this.namespaceSerializer = namespaceSerializer;
- this.stateSerializer = stateSerializer;
- }
-
- public Map getRawResultMap() {
- return rawResultMap;
- }
-
- public TypeSerializer<?> getNamespaceSerializer() {
- return namespaceSerializer;
- }
-
- public TypeSerializer<?> getStateSerializer() {
- return stateSerializer;
}
}
@@ -567,15 +483,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public int numStateEntries() {
int sum = 0;
for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
- for (Map namespaceMap : stateTable.getState()) {
- if (namespaceMap == null) {
- continue;
- }
- Map<?, Map> typedMap = (Map<?, Map>) namespaceMap;
- for (Map entriesMap : typedMap.values()) {
- sum += entriesMap.size();
- }
- }
+ sum += stateTable.size();
}
return sum;
}
@@ -584,22 +492,22 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Returns the total number of state entries across all keys for the given namespace.
*/
@VisibleForTesting
- @SuppressWarnings("unchecked")
- public <N> int numStateEntries(N namespace) {
+ public int numStateEntries(Object namespace) {
int sum = 0;
for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
- for (Map namespaceMap : stateTable.getState()) {
- if (namespaceMap == null) {
- continue;
- }
- Map<?, Map> typedMap = (Map<?, Map>) namespaceMap;
- Map singleNamespace = typedMap.get(namespace);
- if (singleNamespace != null) {
- sum += singleNamespace.size();
- }
- }
+ sum += stateTable.sizeOfNamespace(namespace);
}
return sum;
}
-}
+ public <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+ return asynchronousSnapshots ?
+ new CopyOnWriteStateTable<>(this, newMetaInfo) :
+ new NestedMapsStateTable<>(this, newMetaInfo);
+ }
+
+ @Override
+ public boolean supportsAsynchronousSnapshots() {
+ return asynchronousSnapshots;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 02c3067..d3f67f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -22,14 +22,11 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
-import java.util.Map;
/**
* Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
@@ -46,18 +43,16 @@ public class HeapListState<K, N, V>
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
*/
public HeapListState(
- KeyedStateBackend<K> backend,
ListStateDescriptor<V> stateDesc,
StateTable<K, N, ArrayList<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
- super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ super(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
// ------------------------------------------------------------------------
@@ -66,55 +61,24 @@ public class HeapListState<K, N, V>
@Override
public Iterable<V> get() {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
-
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- return keyedMap.get(backend.<K>getCurrentKey());
+ return stateTable.get(currentNamespace);
}
@Override
public void add(V value) {
- Preconditions.checkState(currentNamespace != null, "No namespace set.");
- Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
+ final N namespace = currentNamespace;
if (value == null) {
clear();
return;
}
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
- stateTable.get(backend.getCurrentKeyGroupIndex());
-
- if (namespaceMap == null) {
- namespaceMap = createNewMap();
- stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
- }
-
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- keyedMap = createNewMap();
- namespaceMap.put(currentNamespace, keyedMap);
- }
-
- ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
+ final StateTable<K, N, ArrayList<V>> map = stateTable;
+ ArrayList<V> list = map.get(namespace);
if (list == null) {
list = new ArrayList<>();
- keyedMap.put(backend.<K>getCurrentKey(), list);
+ map.put(namespace, list);
}
list.add(value);
}
@@ -124,20 +88,7 @@ public class HeapListState<K, N, V>
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
- stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
-
- if (namespaceMap == null) {
- return null;
- }
-
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
-
- if (keyedMap == null) {
- return null;
- }
-
- ArrayList<V> result = keyedMap.get(key);
+ ArrayList<V> result = stateTable.get(key, namespace);
if (result == null) {
return null;