You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/28 09:06:27 UTC
[2/2] flink git commit: [FLINK-8715] Ensure use of reconfigured
serializers in state backends
[FLINK-8715] Ensure use of reconfigured serializers in state backends
This commit consists of the following changes:
1. No longer use StateDescriptors in state handle classes. Instead,
state handles always work with serializers directly.
2. Refactor state registration paths to have better separation of
concerns.
This closes #5885.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07aa2d46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07aa2d46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07aa2d46
Branch: refs/heads/master
Commit: 07aa2d469e34884d715b01166db077a4cf7cf3af
Parents: ff62977
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Apr 20 21:15:42 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Apr 28 17:05:27 2018 +0800
----------------------------------------------------------------------
.../api/common/typeutils/CompatibilityUtil.java | 2 +
.../state/DefaultOperatorStateBackend.java | 5 +-
.../RegisteredKeyedBackendStateMetaInfo.java | 56 ++++
.../state/heap/AbstractHeapMergingState.java | 23 +-
.../runtime/state/heap/AbstractHeapState.java | 35 ++-
.../state/heap/HeapAggregatingState.java | 27 +-
.../runtime/state/heap/HeapFoldingState.java | 36 ++-
.../state/heap/HeapKeyedStateBackend.java | 133 ++++----
.../flink/runtime/state/heap/HeapListState.java | 20 +-
.../flink/runtime/state/heap/HeapMapState.java | 25 +-
.../runtime/state/heap/HeapReducingState.java | 26 +-
.../runtime/state/heap/HeapValueState.java | 22 +-
.../runtime/state/StateBackendTestBase.java | 314 ++++++++++++++++++-
.../streaming/state/AbstractRocksDBState.java | 31 +-
.../state/RocksDBAggregatingState.java | 28 +-
.../streaming/state/RocksDBFoldingState.java | 26 +-
.../state/RocksDBKeyedStateBackend.java | 168 +++++-----
.../streaming/state/RocksDBListState.java | 31 +-
.../streaming/state/RocksDBMapState.java | 22 +-
.../streaming/state/RocksDBReducingState.java | 24 +-
.../streaming/state/RocksDBValueState.java | 25 +-
21 files changed, 753 insertions(+), 326 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index df7f216..6c8583c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -30,6 +30,8 @@ public class CompatibilityUtil {
/**
* Resolves the final compatibility result of two serializers by taking into account compound information,
* including the preceding serializer, the preceding serializer's configuration snapshot, and the new serializer.
+ * This method has the side effect that the provided new serializer may have been reconfigured in order to
+ * remain compatible.
*
* The final result is determined as follows:
* 1. If there is no configuration snapshot of the preceding serializer,
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 01a397a..edbd605 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -757,16 +757,17 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) restoredOperatorStateMetaInfos.get(name);
// check compatibility to determine if state migration is required
+ TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
restoredMetaInfo.getPartitionStateSerializer(),
UnloadableDummyTypeSerializer.class,
restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(),
- partitionStateSerializer);
+ newPartitionStateSerializer);
if (!stateCompatibility.isRequiresMigration()) {
// new serializer is compatible; use it to replace the old serializer
partitionableListState.setStateMetaInfo(
- new RegisteredOperatorBackendStateMetaInfo<>(name, partitionStateSerializer, mode));
+ new RegisteredOperatorBackendStateMetaInfo<>(name, newPartitionStateSerializer, mode));
} else {
// TODO state migration currently isn't possible.
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
index 342bc7f..c7952ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
@@ -19,9 +19,13 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
import java.util.Objects;
@@ -240,4 +244,56 @@ public class RegisteredKeyedBackendStateMetaInfo<N, S> {
return result;
}
}
+
+ /**
+ * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
+ * This checks that the descriptor specifies identical names and state types, as well as
+ * serializers that are compatible for the restored k/v state bytes.
+ */
+ public static <N, S> RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility(
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredStateMetaInfoSnapshot,
+ TypeSerializer<N> newNamespaceSerializer,
+ StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
+
+ Preconditions.checkState(
+ Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
+ "Incompatible state names. " +
+ "Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
+ "registered with [" + newStateDescriptor.getName() + "].");
+
+ if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
+ && !Objects.equals(restoredStateMetaInfoSnapshot.getStateType(), StateDescriptor.Type.UNKNOWN)) {
+
+ Preconditions.checkState(
+ newStateDescriptor.getType() == restoredStateMetaInfoSnapshot.getStateType(),
+ "Incompatible state types. " +
+ "Was [" + restoredStateMetaInfoSnapshot.getStateType() + "], " +
+ "registered with [" + newStateDescriptor.getType() + "].");
+ }
+
+ // check compatibility results to determine if state migration is required
+ CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+ restoredStateMetaInfoSnapshot.getNamespaceSerializer(),
+ null,
+ restoredStateMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+ newNamespaceSerializer);
+
+ TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
+ CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+ restoredStateMetaInfoSnapshot.getStateSerializer(),
+ UnloadableDummyTypeSerializer.class,
+ restoredStateMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+ newStateSerializer);
+
+ if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
+ // TODO state migration currently isn't possible.
+ throw new StateMigrationException("State migration isn't supported, yet.");
+ } else {
+ return new RegisteredKeyedBackendStateMetaInfo<>(
+ newStateDescriptor.getType(),
+ newStateDescriptor.getName(),
+ newNamespaceSerializer,
+ newStateSerializer);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index df762b4..2ecff46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalMergingState;
@@ -36,10 +35,9 @@ import java.util.Collection;
* @param <SV> The type of the values in the state.
* @param <OUT> The type of the output elements.
* @param <S> The type of State
- * @param <SD> The type of StateDescriptor for the State S
*/
-public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends State, SD extends StateDescriptor<S, SV>>
- extends AbstractHeapState<K, N, SV, S, SD>
+public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends State>
+ extends AbstractHeapState<K, N, SV, S>
implements InternalMergingState<K, N, IN, SV, OUT> {
/**
@@ -50,17 +48,20 @@ public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends Stat
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ * @param stateTable The state table for which this state is associated to.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the state.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param defaultValue The default value for the state.
*/
protected AbstractHeapMergingState(
- SD stateDesc,
StateTable<K, N, SV> stateTable,
TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer) {
+ TypeSerializer<SV> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ SV defaultValue) {
- super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
this.mergeTransformation = new MergeTransformation();
}
@@ -106,4 +107,4 @@ public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends Stat
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index e889c53..f824e84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -35,40 +34,44 @@ import org.apache.flink.util.Preconditions;
* @param <N> The type of the namespace.
* @param <SV> The type of the values in the state.
* @param <S> The type of State
- * @param <SD> The type of StateDescriptor for the State S
*/
-public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements InternalKvState<K, N, SV> {
+public abstract class AbstractHeapState<K, N, SV, S extends State> implements InternalKvState<K, N, SV> {
/** Map containing the actual key/value pairs. */
protected final StateTable<K, N, SV> stateTable;
- /** This holds the name of the state and can create an initial default value for the state. */
- protected final SD stateDesc;
-
/** The current namespace, which the access methods will refer to. */
protected N currentNamespace;
protected final TypeSerializer<K> keySerializer;
+ protected final TypeSerializer<SV> valueSerializer;
+
protected final TypeSerializer<N> namespaceSerializer;
+ private final SV defaultValue;
+
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ * @param stateTable The state table for which this state is associated to.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the state.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param defaultValue The default value for the state.
*/
protected AbstractHeapState(
- SD stateDesc,
StateTable<K, N, SV> stateTable,
TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer) {
+ TypeSerializer<SV> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ SV defaultValue) {
- this.stateDesc = stateDesc;
this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null.");
this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
this.namespaceSerializer = namespaceSerializer;
+ this.defaultValue = defaultValue;
this.currentNamespace = null;
}
@@ -114,4 +117,12 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
public StateTable<K, N, SV> getStateTable() {
return stateTable;
}
+
+ protected SV getDefaultValue() {
+ if (defaultValue != null) {
+ return valueSerializer.copy(defaultValue);
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 8e58ac8..84a6d57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
@@ -39,7 +38,7 @@ import java.io.IOException;
* @param <OUT> The type of the value returned from the state.
*/
public class HeapAggregatingState<K, N, IN, ACC, OUT>
- extends AbstractHeapMergingState<K, N, IN, ACC, OUT, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
+ extends AbstractHeapMergingState<K, N, IN, ACC, OUT, AggregatingState<IN, OUT>>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {
private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
@@ -47,21 +46,23 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param stateDesc
- * The state identifier for the state. This contains name and can create a default state value.
- * @param stateTable
- * The state table to use in this kev/value state. May contain initial state.
- * @param namespaceSerializer
- * The serializer for the type that indicates the namespace
+ * @param stateTable The state table for which this state is associated to.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the state.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param defaultValue The default value for the state.
+ * @param aggregateFunction The aggregating function used for aggregating state.
*/
public HeapAggregatingState(
- AggregatingStateDescriptor<IN, ACC, OUT> stateDesc,
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer) {
+ TypeSerializer<ACC> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ ACC defaultValue,
+ AggregateFunction<IN, ACC, OUT> aggregateFunction) {
- super(stateDesc, stateTable, keySerializer, namespaceSerializer);
- this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction());
+ super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
+ this.aggregateTransformation = new AggregateTransformation<>(aggregateFunction);
}
@Override
@@ -76,7 +77,7 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
@Override
public TypeSerializer<ACC> getValueSerializer() {
- return stateDesc.getSerializer();
+ return valueSerializer;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index ed1d0de..3fa0a5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -40,26 +39,31 @@ import java.io.IOException;
*/
@Deprecated
public class HeapFoldingState<K, N, T, ACC>
- extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+ extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>>
implements InternalFoldingState<K, N, T, ACC> {
/** The function used to fold the state */
- private final FoldTransformation<T, ACC> foldTransformation;
+ private final FoldTransformation foldTransformation;
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ * @param stateTable The state table for which this state is associated to.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the state.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param defaultValue The default value for the state.
+ * @param foldFunction The fold function used for folding state.
*/
public HeapFoldingState(
- FoldingStateDescriptor<T, ACC> stateDesc,
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer) {
- super(stateDesc, stateTable, keySerializer, namespaceSerializer);
- this.foldTransformation = new FoldTransformation<>(stateDesc);
+ TypeSerializer<ACC> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ ACC defaultValue,
+ FoldFunction<T, ACC> foldFunction) {
+ super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
+ this.foldTransformation = new FoldTransformation(foldFunction);
}
@Override
@@ -74,7 +78,7 @@ public class HeapFoldingState<K, N, T, ACC>
@Override
public TypeSerializer<ACC> getValueSerializer() {
- return stateDesc.getSerializer();
+ return valueSerializer;
}
// ------------------------------------------------------------------------
@@ -101,19 +105,17 @@ public class HeapFoldingState<K, N, T, ACC>
}
}
- private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
+ private final class FoldTransformation implements StateTransformationFunction<ACC, T> {
- private final FoldingStateDescriptor<T, ACC> stateDescriptor;
private final FoldFunction<T, ACC> foldFunction;
- FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
- this.stateDescriptor = Preconditions.checkNotNull(stateDesc);
- this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction());
+ FoldTransformation(FoldFunction<T, ACC> foldFunction) {
+ this.foldFunction = Preconditions.checkNotNull(foldFunction);
}
@Override
public ACC apply(ACC previousState, T value) throws Exception {
- return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
+ return foldFunction.fold((previousState != null) ? previousState : getDefaultValue(), value);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 10803e2..568ab3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -157,75 +156,35 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws StateMigrationException {
- return tryRegisterStateTable(
- stateDesc.getName(), stateDesc.getType(),
- namespaceSerializer, stateDesc.getSerializer());
- }
+ @SuppressWarnings("unchecked")
+ StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateDesc.getName());
- private <N, V> StateTable<K, N, V> tryRegisterStateTable(
- String stateName,
- StateDescriptor.Type stateType,
- TypeSerializer<N> namespaceSerializer,
- TypeSerializer<V> valueSerializer) throws StateMigrationException {
+ RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo;
+ if (stateTable != null) {
+ @SuppressWarnings("unchecked")
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfoSnapshot =
+ (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateDesc.getName());
- final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+ Preconditions.checkState(
+ restoredMetaInfoSnapshot != null,
+ "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
+ " but its corresponding restored snapshot cannot be found.");
- @SuppressWarnings("unchecked")
- StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
+ newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+ restoredMetaInfoSnapshot,
+ namespaceSerializer,
+ stateDesc);
- if (stateTable == null) {
- stateTable = snapshotStrategy.newStateTable(newMetaInfo);
- stateTables.put(stateName, stateTable);
+ stateTable.setMetaInfo(newMetaInfo);
} else {
- // TODO with eager registration in place, these checks should be moved to restorePartitionedState()
+ newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ stateDesc.getType(),
+ stateDesc.getName(),
+ namespaceSerializer,
+ stateDesc.getSerializer());
- Preconditions.checkState(
- stateName.equals(stateTable.getMetaInfo().getName()),
- "Incompatible state names. " +
- "Was [" + stateTable.getMetaInfo().getName() + "], " +
- "registered with [" + newMetaInfo.getName() + "].");
-
- if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
- && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
-
- Preconditions.checkState(
- newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
- "Incompatible state types. " +
- "Was [" + stateTable.getMetaInfo().getStateType() + "], " +
- "registered with [" + newMetaInfo.getStateType() + "].");
- }
-
- @SuppressWarnings("unchecked")
- RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo =
- (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName);
-
- // check compatibility results to determine if state migration is required
- CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
- restoredMetaInfo.getNamespaceSerializer(),
- null,
- restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
- newMetaInfo.getNamespaceSerializer());
-
- CompatibilityResult<V> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
- restoredMetaInfo.getStateSerializer(),
- UnloadableDummyTypeSerializer.class,
- restoredMetaInfo.getStateSerializerConfigSnapshot(),
- newMetaInfo.getStateSerializer());
-
- if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) {
- // new serializers are compatible; use them to replace the old serializers
- stateTable.setMetaInfo(newMetaInfo);
- } else {
- // TODO state migration currently isn't possible.
-
- // NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
- // since the state has already been deserialized to objects and we can just continue with
- // the new serializer; we're deliberately failing here for now to have equal functionality with
- // the RocksDB backend to avoid confusion for users.
-
- throw new StateMigrationException("State migration isn't supported, yet.");
- }
+ stateTable = snapshotStrategy.newStateTable(newMetaInfo);
+ stateTables.put(stateDesc.getName(), stateTable);
}
return stateTable;
@@ -250,7 +209,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ValueStateDescriptor<V> stateDesc) throws Exception {
StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapValueState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue());
}
@Override
@@ -259,7 +223,12 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ListStateDescriptor<T> stateDesc) throws Exception {
StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapListState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue());
}
@Override
@@ -268,7 +237,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ReducingStateDescriptor<T> stateDesc) throws Exception {
StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapReducingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapReducingState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue(),
+ stateDesc.getReduceFunction());
}
@Override
@@ -277,7 +252,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapAggregatingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapAggregatingState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue(),
+ stateDesc.getAggregateFunction());
}
@Override
@@ -286,7 +267,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapFoldingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ return new HeapFoldingState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue(),
+ stateDesc.getFoldFunction());
}
@Override
@@ -295,7 +282,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapMapState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
+
+ return new HeapMapState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index bd68560..5e4e471 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -40,22 +39,25 @@ import java.util.List;
* @param <V> The type of the value.
*/
public class HeapListState<K, N, V>
- extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>, ListState<V>, ListStateDescriptor<V>>
+ extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>, ListState<V>>
implements InternalListState<K, N, V> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ * @param stateTable The state table for which this state is associated to.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the state.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param defaultValue The default value for the state.
*/
public HeapListState(
- ListStateDescriptor<V> stateDesc,
StateTable<K, N, List<V>> stateTable,
TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer) {
- super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ TypeSerializer<List<V>> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ List<V> defaultValue) {
+ super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
}
@Override
@@ -70,7 +72,7 @@ public class HeapListState<K, N, V>
@Override
public TypeSerializer<List<V>> getValueSerializer() {
- return stateDesc.getSerializer();
+ return valueSerializer;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index ccd017f..36743e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -40,22 +39,27 @@ import java.util.Map;
* @param <UV> The type of the values in the state.
*/
public class HeapMapState<K, N, UK, UV>
- extends AbstractHeapState<K, N, Map<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
+ extends AbstractHeapState<K, N, Map<UK, UV>, MapState<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ * @param stateTable The state table for which this state is associated to.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the state.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param defaultValue The default value for the state.
*/
public HeapMapState(
- MapStateDescriptor<UK, UV> stateDesc,
StateTable<K, N, Map<UK, UV>> stateTable,
TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer) {
- super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ TypeSerializer<Map<UK, UV>> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ Map<UK, UV> defaultValue) {
+ super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
+
+ Preconditions.checkState(valueSerializer instanceof MapSerializer, "Unexpected serializer type.");
}
@Override
@@ -70,10 +74,7 @@ public class HeapMapState<K, N, UK, UV>
@Override
public TypeSerializer<Map<UK, UV>> getValueSerializer() {
- return new MapSerializer<>(
- stateDesc.getKeySerializer(),
- stateDesc.getValueSerializer()
- );
+ return valueSerializer;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 58b3128..47f6279 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -36,7 +35,7 @@ import java.io.IOException;
* @param <V> The type of the value.
*/
public class HeapReducingState<K, N, V>
- extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
+ extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>>
implements InternalReducingState<K, N, V> {
private final ReduceTransformation<V> reduceTransformation;
@@ -44,18 +43,23 @@ public class HeapReducingState<K, N, V>
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param stateTable The state table to use in this kev/value state. May contain initial state.
+ * @param stateTable The state table for which this state is associated to.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the state.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param defaultValue The default value for the state.
+ * @param reduceFunction The reduce function used for reducing state.
*/
public HeapReducingState(
- ReducingStateDescriptor<V> stateDesc,
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer) {
+ TypeSerializer<V> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ V defaultValue,
+ ReduceFunction<V> reduceFunction) {
- super(stateDesc, stateTable, keySerializer, namespaceSerializer);
- this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction());
+ super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
+ this.reduceTransformation = new ReduceTransformation<>(reduceFunction);
}
@Override
@@ -70,7 +74,7 @@ public class HeapReducingState<K, N, V>
@Override
public TypeSerializer<V> getValueSerializer() {
- return stateDesc.getSerializer();
+ return valueSerializer;
}
// ------------------------------------------------------------------------
@@ -119,4 +123,4 @@ public class HeapReducingState<K, N, V>
return previousState != null ? reduceFunction.reduce(previousState, value) : value;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index bf0a3cf..95e81e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalValueState;
@@ -31,22 +30,25 @@ import org.apache.flink.runtime.state.internal.InternalValueState;
* @param <V> The type of the value.
*/
public class HeapValueState<K, N, V>
- extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
+ extends AbstractHeapState<K, N, V, ValueState<V>>
implements InternalValueState<K, N, V> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ * @param stateTable The state table for which this state is associated to.
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the state.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param defaultValue The default value for the state.
*/
public HeapValueState(
- ValueStateDescriptor<V> stateDesc,
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer) {
- super(stateDesc, stateTable, keySerializer, namespaceSerializer);
+ TypeSerializer<V> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ V defaultValue) {
+ super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
}
@Override
@@ -61,7 +63,7 @@ public class HeapValueState<K, N, V>
@Override
public TypeSerializer<V> getValueSerializer() {
- return stateDesc.getSerializer();
+ return valueSerializer;
}
@Override
@@ -69,7 +71,7 @@ public class HeapValueState<K, N, V>
final V result = stateTable.get(currentNamespace);
if (result == null) {
- return stateDesc.getDefaultValue();
+ return getDefaultValue();
}
return result;
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 41df1b3..5737964 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -40,7 +40,10 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -51,6 +54,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -82,6 +87,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -90,6 +96,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.Timer;
@@ -759,6 +766,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
+ @SuppressWarnings("unchecked")
public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
Environment env = new DummyEnvironment();
@@ -777,6 +785,15 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ // access the internal state representation to retrieve the original Kryo registration ids;
+ // these will be later used to check that on restore, the new Kryo serializer has reconfigured itself to
+ // have identical mappings
+ InternalKvState internalKvState = (InternalKvState) state;
+ KryoSerializer<TestPojo> kryoSerializer = (KryoSerializer<TestPojo>) internalKvState.getValueSerializer();
+ int mainPojoClassRegistrationId = kryoSerializer.getKryo().getRegistration(TestPojo.class).getId();
+ int nestedPojoClassARegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId();
+ int nestedPojoClassBRegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId();
+
// ============== create snapshot of current configuration ==============
// make some more modifications
@@ -810,6 +827,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
kvId = new ValueStateDescriptor<>("id", pojoType);
state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ // verify that on restore, the serializer that the state handle uses has reconfigured itself to have
+ // identical Kryo registration ids compared to the previous execution
+ internalKvState = (InternalKvState) state;
+ kryoSerializer = (KryoSerializer<TestPojo>) internalKvState.getValueSerializer();
+ assertEquals(mainPojoClassRegistrationId, kryoSerializer.getKryo().getRegistration(TestPojo.class).getId());
+ assertEquals(nestedPojoClassARegistrationId, kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId());
+ assertEquals(nestedPojoClassBRegistrationId, kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId());
+
backend.setCurrentKey(1);
// update to test state backends that eagerly serialize, such as RocksDB
@@ -893,6 +918,110 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
+ public void testStateSerializerReconfiguration() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+ Environment env = new DummyEnvironment();
+
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ try {
+ ValueStateDescriptor<TestCustomStateClass> kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
+ ValueState<TestCustomStateClass> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // ============== create snapshot, using the non-reconfigured serializer ==============
+
+ // make some modifications
+ backend.setCurrentKey(1);
+ state.update(new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
+
+ backend.setCurrentKey(2);
+ state.update(new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
+
+ // verify that our assumption that the serializer is not yet reconfigured;
+ // we cast the state handle to the internal representation in order to retrieve the serializer
+ InternalKvState internal = (InternalKvState) state;
+ assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
+ assertFalse(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
+
+ KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+ CheckpointOptions.forCheckpointWithDefaultLocation()));
+
+ snapshot1.registerSharedStates(sharedStateRegistry);
+ backend.dispose();
+
+ // ========== restore snapshot, which should reconfigure the serializer, and then create a snapshot again ==========
+
+ env = new DummyEnvironment();
+
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1, env);
+
+ kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // verify that the serializer used is correctly reconfigured
+ internal = (InternalKvState) state;
+ assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
+ assertTrue(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
+
+ backend.setCurrentKey(1);
+ TestCustomStateClass restoredState1 = state.value();
+ assertEquals("test-message-1", restoredState1.getMessage());
+ // the previous serializer schema does not contain the extra message
+ assertNull(restoredState1.getExtraMessage());
+
+ state.update(new TestCustomStateClass("new-test-message-1", "extra-message-1"));
+
+ backend.setCurrentKey(2);
+ TestCustomStateClass restoredState2 = state.value();
+ assertEquals("test-message-2", restoredState2.getMessage());
+ assertNull(restoredState1.getExtraMessage());
+
+ state.update(new TestCustomStateClass("new-test-message-2", "extra-message-2"));
+
+ KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
+ 682375462379L,
+ 3,
+ streamFactory,
+ CheckpointOptions.forCheckpointWithDefaultLocation()));
+
+ snapshot2.registerSharedStates(sharedStateRegistry);
+ snapshot1.discardState();
+ backend.dispose();
+
+ // ========== restore snapshot again; state should now be in the new schema containing the extra message ==========
+
+ env = new DummyEnvironment();
+
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+
+ snapshot2.discardState();
+
+ kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ internal = (InternalKvState) state;
+ assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
+ assertTrue(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
+
+ backend.setCurrentKey(1);
+ restoredState1 = state.value();
+ assertEquals("new-test-message-1", restoredState1.getMessage());
+ assertEquals("extra-message-1", restoredState1.getExtraMessage());
+
+ backend.setCurrentKey(2);
+ restoredState2 = state.value();
+ assertEquals("new-test-message-2", restoredState2.getMessage());
+ assertEquals("extra-message-2", restoredState2.getExtraMessage());
+ } finally {
+ backend.dispose();
+ }
+ }
+
+ @Test
@SuppressWarnings("unchecked")
public void testValueState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -3074,7 +3203,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
state.update(121818273);
- StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+ StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
@@ -3096,7 +3225,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
state.add(121818273);
- StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+ StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? , ?>) kvState).getStateTable();
checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
@@ -3123,7 +3252,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
state.add(121818273);
- StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+ StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
@@ -3150,7 +3279,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.setCurrentKey(1);
state.add(121818273);
- StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?,? ,?, ?>) kvState).getStateTable();
+ StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}
@@ -3896,6 +4025,183 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
/**
+ * Custom state class used for testing state serializer schema migration.
+ * The corresponding serializer used in the tests is {@link TestReconfigurableCustomTypeSerializer}.
+ */
+ public static class TestCustomStateClass {
+
+ private String message;
+ private String extraMessage;
+
+ public TestCustomStateClass(String message, String extraMessage) {
+ this.message = message;
+ this.extraMessage = extraMessage;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public String getExtraMessage() {
+ return extraMessage;
+ }
+
+ public void setExtraMessage(String extraMessage) {
+ this.extraMessage = extraMessage;
+ }
+ }
+
+ /**
+ * A reconfigurable serializer that simulates backwards compatible schema evolution for the {@link TestCustomStateClass}.
+ * A flag is maintained to determine whether or not the serializer has be reconfigured.
+ * Whether or not it has been reconfigured affects which fields of {@link TestCustomStateClass} instances are
+ * written and read on serialization.
+ */
+ public static class TestReconfigurableCustomTypeSerializer extends TypeSerializer<TestCustomStateClass> {
+
+ private boolean reconfigured = false;
+
+ public TestReconfigurableCustomTypeSerializer() {}
+
+ /** Copy constructor. */
+ private TestReconfigurableCustomTypeSerializer(boolean reconfigured) {
+ this.reconfigured = reconfigured;
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return new ParameterlessTypeSerializerConfig(getClass().getName());
+ }
+
+ @Override
+ public CompatibilityResult<TestCustomStateClass> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof ParameterlessTypeSerializerConfig &&
+ ((ParameterlessTypeSerializerConfig) configSnapshot).getSerializationFormatIdentifier().equals(getClass().getName())) {
+
+ this.reconfigured = true;
+ return CompatibilityResult.compatible();
+ } else {
+ return CompatibilityResult.requiresMigration();
+ }
+ }
+
+ @Override
+ public TypeSerializer<TestCustomStateClass> duplicate() {
+ return new TestReconfigurableCustomTypeSerializer(reconfigured);
+ }
+
+ @Override
+ public TestCustomStateClass createInstance() {
+ return new TestCustomStateClass(null, null);
+ }
+
+ @Override
+ public void serialize(TestCustomStateClass record, DataOutputView target) throws IOException {
+ target.writeBoolean(reconfigured);
+
+ target.writeUTF(record.getMessage());
+ if (reconfigured) {
+ target.writeUTF(record.getExtraMessage());
+ }
+ }
+
+ @Override
+ public TestCustomStateClass deserialize(DataInputView source) throws IOException {
+ boolean isNewSchema = source.readBoolean();
+
+ String message = source.readUTF();
+ if (isNewSchema) {
+ return new TestCustomStateClass(message, source.readUTF());
+ } else {
+ return new TestCustomStateClass(message, null);
+ }
+ }
+
+ @Override
+ public TestCustomStateClass deserialize(TestCustomStateClass reuse, DataInputView source) throws IOException {
+ boolean isNewSchema = source.readBoolean();
+
+ String message = source.readUTF();
+ if (isNewSchema) {
+ reuse.setMessage(message);
+ reuse.setExtraMessage(source.readUTF());
+ return reuse;
+ } else {
+ reuse.setMessage(message);
+ reuse.setExtraMessage(null);
+ return reuse;
+ }
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ boolean reconfigured = source.readBoolean();
+
+ target.writeUTF(source.readUTF());
+ if (reconfigured)
+ target.writeUTF(source.readUTF());
+ }
+
+ @Override
+ public TestCustomStateClass copy(TestCustomStateClass from) {
+ return new TestCustomStateClass(from.getMessage(), from.getExtraMessage());
+ }
+
+ @Override
+ public TestCustomStateClass copy(TestCustomStateClass from, TestCustomStateClass reuse) {
+ reuse.setMessage(from.getMessage());
+ reuse.setExtraMessage(from.getExtraMessage());
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof TestReconfigurableCustomTypeSerializer;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+
+ if (!(obj instanceof TestReconfigurableCustomTypeSerializer)) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ } else {
+ TestReconfigurableCustomTypeSerializer other = (TestReconfigurableCustomTypeSerializer) obj;
+ return other.reconfigured == this.reconfigured;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass().getName(), reconfigured);
+ }
+
+ public boolean isReconfigured() {
+ return reconfigured;
+ }
+ }
+
+ /**
* We throw this in our {@link ExceptionThrowingTestSerializer}.
*/
private static class ExpectedKryoTestException extends RuntimeException {}
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 8c59979..92c9e80 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -18,7 +18,6 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -45,13 +44,15 @@ import java.io.IOException;
* @param <N> The type of the namespace.
* @param <V> The type of values kept internally in state.
* @param <S> The type of {@link State}.
- * @param <SD> The type of {@link StateDescriptor}.
*/
-public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends StateDescriptor<S, V>> implements InternalKvState<K, N, V>, State {
+public abstract class AbstractRocksDBState<K, N, V, S extends State> implements InternalKvState<K, N, V>, State {
/** Serializer for the namespace. */
final TypeSerializer<N> namespaceSerializer;
+ /** Serializer for the state values. */
+ final TypeSerializer<V> valueSerializer;
+
/** The current namespace, which the next value methods will refer to. */
private N currentNamespace;
@@ -61,8 +62,7 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends
/** The column family of this particular instance of state. */
protected ColumnFamilyHandle columnFamily;
- /** State descriptor from which to create this state instance. */
- protected final SD stateDesc;
+ protected final V defaultValue;
protected final WriteOptions writeOptions;
@@ -74,12 +74,18 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends
/**
* Creates a new RocksDB backed state.
- * @param namespaceSerializer The serializer for the namespace.
+ *
+ * @param columnFamily The RocksDB column family that this state is associated to.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param valueSerializer The serializer for the state.
+ * @param defaultValue The default value for the state.
+ * @param backend The backend for which this state is bind to.
*/
protected AbstractRocksDBState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
- SD stateDesc,
+ TypeSerializer<V> valueSerializer,
+ V defaultValue,
RocksDBKeyedStateBackend<K> backend) {
this.namespaceSerializer = namespaceSerializer;
@@ -88,7 +94,8 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends
this.columnFamily = columnFamily;
this.writeOptions = backend.getWriteOptions();
- this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor");
+ this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer");
+ this.defaultValue = defaultValue;
this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
@@ -190,4 +197,12 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends
RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
}
+
+ protected V getDefaultValue() {
+ if (defaultValue != null) {
+ return valueSerializer.copy(defaultValue);
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 14671a5..f8d7980 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -43,33 +42,32 @@ import java.util.Collection;
* @param <R> The type of the value returned from the state
*/
public class RocksDBAggregatingState<K, N, T, ACC, R>
- extends AbstractRocksDBState<K, N, ACC, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>>
+ extends AbstractRocksDBState<K, N, ACC, AggregatingState<T, R>>
implements InternalAggregatingState<K, N, T, ACC, R> {
- /** Serializer for the values. */
- private final TypeSerializer<ACC> valueSerializer;
-
/** User-specified aggregation function. */
private final AggregateFunction<T, ACC, R> aggFunction;
/**
* Creates a new {@code RocksDBAggregatingState}.
*
- * @param namespaceSerializer
- * The serializer for the namespace.
- * @param stateDesc
- * The state identifier for the state. This contains the state name and aggregation function.
+ * @param columnFamily The RocksDB column family that this state is associated to.
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param valueSerializer The serializer for the state.
+ * @param defaultValue The default value for the state.
+ * @param aggFunction The aggregate function used for aggregating state.
+ * @param backend The backend for which this state is bind to.
*/
public RocksDBAggregatingState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc,
+ TypeSerializer<ACC> valueSerializer,
+ ACC defaultValue,
+ AggregateFunction<T, ACC, R> aggFunction,
RocksDBKeyedStateBackend<K> backend) {
- super(columnFamily, namespaceSerializer, stateDesc, backend);
-
- this.valueSerializer = stateDesc.getSerializer();
- this.aggFunction = stateDesc.getAggregateFunction();
+ super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
+ this.aggFunction = aggFunction;
}
@Override
@@ -84,7 +82,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
@Override
public TypeSerializer<ACC> getValueSerializer() {
- return stateDesc.getSerializer();
+ return valueSerializer;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 6f5baff..4bc6bf9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -44,31 +43,32 @@ import java.io.IOException;
*/
@Deprecated
public class RocksDBFoldingState<K, N, T, ACC>
- extends AbstractRocksDBState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+ extends AbstractRocksDBState<K, N, ACC, FoldingState<T, ACC>>
implements InternalFoldingState<K, N, T, ACC> {
- /** Serializer for the values. */
- private final TypeSerializer<ACC> valueSerializer;
-
/** User-specified fold function. */
private final FoldFunction<T, ACC> foldFunction;
/**
* Creates a new {@code RocksDBFoldingState}.
*
+ * @param columnFamily The RocksDB column family that this state is associated to.
* @param namespaceSerializer The serializer for the namespace.
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
+ * @param valueSerializer The serializer for the state.
+ * @param defaultValue The default value for the state.
+ * @param foldFunction The fold function used for folding state.
+ * @param backend The backend for which this state is bind to.
*/
public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc,
+ TypeSerializer<ACC> valueSerializer,
+ ACC defaultValue,
+ FoldFunction<T, ACC> foldFunction,
RocksDBKeyedStateBackend<K> backend) {
- super(columnFamily, namespaceSerializer, stateDesc, backend);
+ super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
- this.valueSerializer = stateDesc.getSerializer();
- this.foldFunction = stateDesc.getFoldFunction();
+ this.foldFunction = foldFunction;
}
@Override
@@ -83,7 +83,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
@Override
public TypeSerializer<ACC> getValueSerializer() {
- return stateDesc.getSerializer();
+ return valueSerializer;
}
@Override
@@ -110,7 +110,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
if (valueBytes == null) {
keySerializationStream.reset();
- valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
+ valueSerializer.serialize(foldFunction.fold(getDefaultValue(), value), out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} else {
ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 69ce95c..2990212 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -1116,89 +1115,71 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// ------------------------------------------------------------------------
/**
- * Creates a column family handle for use with a k/v state. When restoring from a snapshot
- * we don't restore the individual k/v states, just the global RocksDB database and the
- * list of column families. When a k/v state is first requested we check here whether we
- * already have a column family for that and return it or create a new one if it doesn't exist.
+ * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
*
- * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
- * that we checkpointed, i.e. is already in the map of column families.
+ * <p>When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
+ * the list of k/v state information. When a k/v state is first requested we check here whether we
+ * already have a registered entry for that and return it (after some necessary state compatibility checks)
+ * or create a new one if it does not exist.
*/
- @SuppressWarnings("rawtypes, unchecked")
- protected <N, S> ColumnFamilyHandle getColumnFamily(
- StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
+ private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
+ StateDescriptor<?, S> stateDesc,
+ TypeSerializer<N> namespaceSerializer) throws StateMigrationException, IOException {
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
- kvStateInformation.get(descriptor.getName());
-
- RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
- descriptor.getType(),
- descriptor.getName(),
- namespaceSerializer,
- descriptor.getSerializer());
+ kvStateInformation.get(stateDesc.getName());
+ RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo;
if (stateInfo != null) {
- // TODO with eager registration in place, these checks should be moved to restore()
- RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
- (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
+ @SuppressWarnings("unchecked")
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfoSnapshot =
+ (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(stateDesc.getName());
Preconditions.checkState(
- Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()),
- "Incompatible state names. " +
- "Was [" + restoredMetaInfo.getName() + "], " +
- "registered with [" + newMetaInfo.getName() + "].");
-
- if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)
- && !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) {
-
- Preconditions.checkState(
- newMetaInfo.getStateType() == restoredMetaInfo.getStateType(),
- "Incompatible state types. " +
- "Was [" + restoredMetaInfo.getStateType() + "], " +
- "registered with [" + newMetaInfo.getStateType() + "].");
- }
+ restoredMetaInfoSnapshot != null,
+ "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
+ " but its corresponding restored snapshot cannot be found.");
- // check compatibility results to determine if state migration is required
- CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
- restoredMetaInfo.getNamespaceSerializer(),
- null,
- restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
- newMetaInfo.getNamespaceSerializer());
+ newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+ restoredMetaInfoSnapshot,
+ namespaceSerializer,
+ stateDesc);
- CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
- restoredMetaInfo.getStateSerializer(),
- UnloadableDummyTypeSerializer.class,
- restoredMetaInfo.getStateSerializerConfigSnapshot(),
- newMetaInfo.getStateSerializer());
+ stateInfo.f1 = newMetaInfo;
+ } else {
+ String stateName = stateDesc.getName();
- if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
- // TODO state migration currently isn't possible.
- throw new StateMigrationException("State migration isn't supported, yet.");
- } else {
- stateInfo.f1 = newMetaInfo;
- return stateInfo.f0;
- }
+ newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ stateDesc.getType(),
+ stateName,
+ namespaceSerializer,
+ stateDesc.getSerializer());
+
+ ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
+
+ stateInfo = Tuple2.of(columnFamily, newMetaInfo);
+ kvStateInformation.put(stateDesc.getName(), stateInfo);
}
- byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+ return Tuple2.of(stateInfo.f0, newMetaInfo);
+ }
+
+ /**
+ * Creates a column family handle for use with a k/v state.
+ */
+ private ColumnFamilyHandle createColumnFamily(String stateName) throws IOException {
+ byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");
ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions);
- final ColumnFamilyHandle columnFamily;
-
try {
- columnFamily = db.createColumnFamily(columnDescriptor);
+ return db.createColumnFamily(columnDescriptor);
} catch (RocksDBException e) {
throw new IOException("Error creating ColumnFamilyHandle.", e);
}
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple =
- new Tuple2<>(columnFamily, newMetaInfo);
- kvStateInformation.put(descriptor.getName(), tuple);
- return columnFamily;
}
@Override
@@ -1206,9 +1187,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
+ tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
- return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this);
+ return new RocksDBValueState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ this);
}
@Override
@@ -1216,9 +1203,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
+ tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
- return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
+ return new RocksDBListState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ stateDesc.getElementSerializer(),
+ this);
}
@Override
@@ -1226,9 +1220,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
+ tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
- return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+ return new RocksDBReducingState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ stateDesc.getReduceFunction(),
+ this);
}
@Override
@@ -1236,8 +1237,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
- return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
+ tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
+
+ return new RocksDBAggregatingState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ stateDesc.getAggregateFunction(),
+ this);
}
@Override
@@ -1245,9 +1254,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
+ tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
- return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+ return new RocksDBFoldingState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ stateDesc.getFoldFunction(),
+ this);
}
@Override
@@ -1255,9 +1271,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
+ tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
- return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);
+ return new RocksDBMapState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ this);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/07aa2d46/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index d9bcb6a..e0e5d52 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -19,7 +19,6 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -47,11 +46,11 @@ import java.util.List;
* @param <V> The type of the values in the list state.
*/
public class RocksDBListState<K, N, V>
- extends AbstractRocksDBState<K, N, List<V>, ListState<V>, ListStateDescriptor<V>>
+ extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
implements InternalListState<K, N, V> {
/** Serializer for the values. */
- private final TypeSerializer<V> valueSerializer;
+ private final TypeSerializer<V> elementSerializer;
/**
* Separator of StringAppendTestOperator in RocksDB.
@@ -61,17 +60,23 @@ public class RocksDBListState<K, N, V>
/**
* Creates a new {@code RocksDBListState}.
*
+ * @param columnFamily The RocksDB column family that this state is associated to.
* @param namespaceSerializer The serializer for the namespace.
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
+ * @param valueSerializer The serializer for the state.
+ * @param defaultValue The default value for the state.
+ * @param elementSerializer The serializer for elements of the list state.
+ * @param backend The backend for which this state is bind to.
*/
- public RocksDBListState(ColumnFamilyHandle columnFamily,
+ public RocksDBListState(
+ ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<V> stateDesc,
+ TypeSerializer<List<V>> valueSerializer,
+ List<V> defaultValue,
+ TypeSerializer<V> elementSerializer,
RocksDBKeyedStateBackend<K> backend) {
- super(columnFamily, namespaceSerializer, stateDesc, backend);
- this.valueSerializer = stateDesc.getElementSerializer();
+ super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
+ this.elementSerializer = elementSerializer;
}
@Override
@@ -86,7 +91,7 @@ public class RocksDBListState<K, N, V>
@Override
public TypeSerializer<List<V>> getValueSerializer() {
- return stateDesc.getSerializer();
+ return valueSerializer;
}
@Override
@@ -105,7 +110,7 @@ public class RocksDBListState<K, N, V>
List<V> result = new ArrayList<>();
while (in.available() > 0) {
- result.add(valueSerializer.deserialize(in));
+ result.add(elementSerializer.deserialize(in));
if (in.available() > 0) {
in.readByte();
}
@@ -125,7 +130,7 @@ public class RocksDBListState<K, N, V>
byte[] key = keySerializationStream.toByteArray();
keySerializationStream.reset();
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
- valueSerializer.serialize(value, out);
+ elementSerializer.serialize(value, out);
backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
@@ -227,7 +232,7 @@ public class RocksDBListState<K, N, V>
} else {
keySerializationStream.write(DELIMITER);
}
- valueSerializer.serialize(value, out);
+ elementSerializer.serialize(value, out);
}
return keySerializationStream.toByteArray();