You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/06/18 12:42:25 UTC
[1/2] flink git commit: [FLINK-9571] Repace StateBinder with internal
backend-specific state factories
Repository: flink
Updated Branches:
refs/heads/master 0e9b066aa -> 0bdde8377
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 21a211e..aac5240 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -41,7 +43,6 @@ import java.util.List;
class HeapListState<K, N, V>
extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>>
implements InternalListState<K, N, V> {
-
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
@@ -51,7 +52,7 @@ class HeapListState<K, N, V>
* @param namespaceSerializer The serializer for the namespace.
* @param defaultValue The default value for the state.
*/
- HeapListState(
+ private HeapListState(
StateTable<K, N, List<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<List<V>> valueSerializer,
@@ -183,4 +184,17 @@ class HeapListState<K, N, V>
});
}
}
+
+ @SuppressWarnings("unchecked")
+ static <E, K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) {
+ return (IS) new HeapListState<>(
+ (StateTable<K, N, List<E>>) stateTable,
+ keySerializer,
+ (TypeSerializer<List<E>>) stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ (List<E>) stateDesc.getDefaultValue());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 34e5f8e..745e7f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -51,7 +53,7 @@ class HeapMapState<K, N, UK, UV>
* @param namespaceSerializer The serializer for the namespace.
* @param defaultValue The default value for the state.
*/
- HeapMapState(
+ private HeapMapState(
StateTable<K, N, Map<UK, UV>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<Map<UK, UV>> valueSerializer,
@@ -85,7 +87,7 @@ class HeapMapState<K, N, UK, UV>
if (userMap == null) {
return null;
}
-
+
return userMap.get(userKey);
}
@@ -140,7 +142,7 @@ class HeapMapState<K, N, UK, UV>
Map<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.entrySet();
}
-
+
@Override
public Iterable<UK> keys() {
Map<UK, UV> userMap = stateTable.get(currentNamespace);
@@ -187,4 +189,17 @@ class HeapMapState<K, N, UK, UV>
return KvStateSerializer.serializeMap(result.entrySet(), dupUserKeySerializer, dupUserValueSerializer);
}
+
+ @SuppressWarnings("unchecked")
+ static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) {
+ return (IS) new HeapMapState<>(
+ (StateTable<K, N, Map<UK, UV>>) stateTable,
+ keySerializer,
+ (TypeSerializer<Map<UK, UV>>) stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ (Map<UK, UV>) stateDesc.getDefaultValue());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index b9cee8d..7e9bff0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -50,13 +53,13 @@ class HeapReducingState<K, N, V>
* @param defaultValue The default value for the state.
* @param reduceFunction The reduce function used for reducing state.
*/
- public HeapReducingState(
- StateTable<K, N, V> stateTable,
- TypeSerializer<K> keySerializer,
- TypeSerializer<V> valueSerializer,
- TypeSerializer<N> namespaceSerializer,
- V defaultValue,
- ReduceFunction<V> reduceFunction) {
+ private HeapReducingState(
+ StateTable<K, N, V> stateTable,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<V> valueSerializer,
+ TypeSerializer<N> namespaceSerializer,
+ V defaultValue,
+ ReduceFunction<V> reduceFunction) {
super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
this.reduceTransformation = new ReduceTransformation<>(reduceFunction);
@@ -123,4 +126,18 @@ class HeapReducingState<K, N, V>
return previousState != null ? reduceFunction.reduce(previousState, value) : value;
}
}
+
+ @SuppressWarnings("unchecked")
+ static <K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) {
+ return (IS) new HeapReducingState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue(),
+ ((ReducingStateDescriptor<SV>) stateDesc).getReduceFunction());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index 9e0687f..55b9c32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalValueState;
@@ -42,7 +44,7 @@ class HeapValueState<K, N, V>
* @param namespaceSerializer The serializer for the namespace.
* @param defaultValue The default value for the state.
*/
- HeapValueState(
+ private HeapValueState(
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer,
@@ -87,4 +89,17 @@ class HeapValueState<K, N, V>
stateTable.put(currentNamespace, value);
}
+
+ @SuppressWarnings("unchecked")
+ static <K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) {
+ return (IS) new HeapValueState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 37e2714..ad67171 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -3545,7 +3545,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
// insert some data to the backend.
- InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3602,7 +3602,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
try {
backend = createKeyedBackend(IntSerializer.INSTANCE);
- InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3649,7 +3649,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
try {
backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle);
- InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3791,7 +3791,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
return;
}
- InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index e646c64..3c06b71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -120,9 +120,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
try {
InternalValueState<String, VoidNamespace, String> state =
- stateBackend.createValueState(
- new VoidNamespaceSerializer(),
- stateDescriptor);
+ stateBackend.createState(new VoidNamespaceSerializer(), stateDescriptor);
stateBackend.setCurrentKey("A");
state.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -163,7 +161,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
stateBackend.restore(StateObjectCollection.singleton(stateHandle));
- InternalValueState<String, VoidNamespace, String> state = stateBackend.createValueState(
+ InternalValueState<String, VoidNamespace, String> state = stateBackend.createState(
new VoidNamespaceSerializer(),
stateDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 345cd4f..249d0c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -73,7 +73,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot()));
- InternalMapState<String, Integer, Long, Long> state = keyedBackend.createMapState(IntSerializer.INSTANCE, stateDescr);
+ InternalMapState<String, Integer, Long, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
keyedBackend.setCurrentKey("abc");
state.setCurrentNamespace(namespace1);
@@ -209,7 +209,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
/**
* [FLINK-5979]
*
- * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check
+ * <p>This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check
* the backwards compatibility of the serialization format of {@link StateTable}s.
*/
@Test
@@ -233,7 +233,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
- InternalListState<String, Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+ InternalListState<String, Integer, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
assertEquals(7, keyedBackend.numStateEntries());
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 76f59d1..ceae3e1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -20,9 +20,14 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -39,9 +44,9 @@ import java.util.Collection;
* @param <ACC> The type of the value stored in the state (the accumulator type)
* @param <R> The type of the value returned from the state
*/
-public class RocksDBAggregatingState<K, N, T, ACC, R>
- extends AbstractRocksDBAppendingState<K, N, T, ACC, R, AggregatingState<T, R>>
- implements InternalAggregatingState<K, N, T, ACC, R> {
+class RocksDBAggregatingState<K, N, T, ACC, R>
+ extends AbstractRocksDBAppendingState<K, N, T, ACC, R, AggregatingState<T, R>>
+ implements InternalAggregatingState<K, N, T, ACC, R> {
/** User-specified aggregation function. */
private final AggregateFunction<T, ACC, R> aggFunction;
@@ -56,7 +61,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
* @param aggFunction The aggregate function used for aggregating state.
* @param backend The backend for which this state is bind to.
*/
- public RocksDBAggregatingState(
+ private RocksDBAggregatingState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<ACC> valueSerializer,
@@ -168,4 +173,18 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
}
}
+
+ @SuppressWarnings("unchecked")
+ static <K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ RocksDBKeyedStateBackend<K> backend) {
+ return (IS) new RocksDBAggregatingState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ ((AggregatingStateDescriptor<?, SV, ?>) stateDesc).getAggregateFunction(),
+ backend);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index e2ef32b..d814c26 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -20,7 +20,12 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.rocksdb.ColumnFamilyHandle;
@@ -36,9 +41,9 @@ import org.rocksdb.ColumnFamilyHandle;
* @deprecated will be removed in a future version
*/
@Deprecated
-public class RocksDBFoldingState<K, N, T, ACC>
- extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC, FoldingState<T, ACC>>
- implements InternalFoldingState<K, N, T, ACC> {
+class RocksDBFoldingState<K, N, T, ACC>
+ extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC, FoldingState<T, ACC>>
+ implements InternalFoldingState<K, N, T, ACC> {
/** User-specified fold function. */
private final FoldFunction<T, ACC> foldFunction;
@@ -53,7 +58,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
* @param foldFunction The fold function used for folding state.
* @param backend The backend for which this state is bind to.
*/
- public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
+ private RocksDBFoldingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<ACC> valueSerializer,
ACC defaultValue,
@@ -93,4 +98,18 @@ public class RocksDBFoldingState<K, N, T, ACC>
accumulator = foldFunction.fold(accumulator, value);
updateInternal(key, accumulator);
}
+
+ @SuppressWarnings("unchecked")
+ static <K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ RocksDBKeyedStateBackend<K> backend) {
+ return (IS) new RocksDBFoldingState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ ((FoldingStateDescriptor<?, SV>) stateDesc).getFoldFunction(),
+ backend);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f997f8d..e5f443a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
@@ -76,12 +77,6 @@ import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
@@ -133,6 +128,7 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -156,6 +152,23 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** File suffix of sstable files. */
private static final String SST_FILE_SUFFIX = ".sst";
+ private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+ Stream.of(
+ Tuple2.of(ValueStateDescriptor.class, (StateFactory) RocksDBValueState::create),
+ Tuple2.of(ListStateDescriptor.class, (StateFactory) RocksDBListState::create),
+ Tuple2.of(MapStateDescriptor.class, (StateFactory) RocksDBMapState::create),
+ Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) RocksDBAggregatingState::create),
+ Tuple2.of(ReducingStateDescriptor.class, (StateFactory) RocksDBReducingState::create),
+ Tuple2.of(FoldingStateDescriptor.class, (StateFactory) RocksDBFoldingState::create)
+ ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+ private interface StateFactory {
+ <K, N, SV, S extends State, IS extends S> IS createState(
+ StateDescriptor<S, SV> stateDesc,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ RocksDBKeyedStateBackend<K> backend) throws Exception;
+ }
+
/** String that identifies the operator that owns this backend. */
private final String operatorIdentifier;
@@ -1303,103 +1316,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, T> InternalValueState<K, N, T> createValueState(
- TypeSerializer<N> namespaceSerializer,
- ValueStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBValueState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- this);
- }
-
- @Override
- protected <N, T> InternalListState<K, N, T> createListState(
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBListState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getElementSerializer(),
- this);
- }
-
- @Override
- protected <N, T> InternalReducingState<K, N, T> createReducingState(
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBReducingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getReduceFunction(),
- this);
- }
-
- @Override
- protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
- TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBAggregatingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getAggregateFunction(),
- this);
- }
-
- @Override
- protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
+ public <N, SV, S extends State, IS extends S> IS createState(
TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBFoldingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getFoldFunction(),
- this);
- }
-
- @Override
- protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
- TypeSerializer<N> namespaceSerializer,
- MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBMapState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- this);
+ StateDescriptor<S, SV> stateDesc) throws Exception {
+ StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not supported by %s",
+ stateDesc.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
+ tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
+ return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 3b27316..03faa44 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -19,9 +19,14 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -46,9 +51,9 @@ import java.util.List;
* @param <N> The type of the namespace.
* @param <V> The type of the values in the list state.
*/
-public class RocksDBListState<K, N, V>
- extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
- implements InternalListState<K, N, V> {
+class RocksDBListState<K, N, V>
+ extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
+ implements InternalListState<K, N, V> {
/** Serializer for the values. */
private final TypeSerializer<V> elementSerializer;
@@ -68,7 +73,7 @@ public class RocksDBListState<K, N, V>
* @param elementSerializer The serializer for elements of the list state.
* @param backend The backend for which this state is bind to.
*/
- public RocksDBListState(
+ private RocksDBListState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<List<V>> valueSerializer,
@@ -183,7 +188,12 @@ public class RocksDBListState<K, N, V>
}
@Override
- public void update(List<V> values) {
+ public void update(List<V> valueToStore) {
+ updateInternal(valueToStore);
+ }
+
+ @Override
+ public void updateInternal(List<V> values) {
Preconditions.checkNotNull(values, "List of values to add cannot be null.");
clear();
@@ -244,8 +254,17 @@ public class RocksDBListState<K, N, V>
return keySerializationStream.toByteArray();
}
- @Override
- public void updateInternal(List<V> valueToStore) {
- update(valueToStore);
+ @SuppressWarnings("unchecked")
+ static <E, K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ RocksDBKeyedStateBackend<K> backend) {
+ return (IS) new RocksDBListState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ (TypeSerializer<List<E>>) registerResult.f1.getStateSerializer(),
+ (List<E>) stateDesc.getDefaultValue(),
+ ((ListStateDescriptor<E>) stateDesc).getElementSerializer(),
+ backend);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 65d81ac..9d00a67 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -19,6 +19,8 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +30,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -39,6 +42,7 @@ import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import java.io.IOException;
@@ -58,9 +62,9 @@ import java.util.Map;
* @param <UK> The type of the keys in the map state.
* @param <UV> The type of the values in the map state.
*/
-public class RocksDBMapState<K, N, UK, UV>
- extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
- implements InternalMapState<K, N, UK, UV> {
+class RocksDBMapState<K, N, UK, UV>
+ extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
+ implements InternalMapState<K, N, UK, UV> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
@@ -77,7 +81,7 @@ public class RocksDBMapState<K, N, UK, UV>
* @param defaultValue The default value for the state.
* @param backend The backend for which this state is bind to.
*/
- public RocksDBMapState(
+ private RocksDBMapState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<Map<UK, UV>> valueSerializer,
@@ -160,60 +164,45 @@ public class RocksDBMapState<K, N, UK, UV>
}
@Override
- public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException {
+ public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
final Iterator<Map.Entry<UK, UV>> iterator = iterator();
// Return null to make the behavior consistent with other states.
if (!iterator.hasNext()) {
return null;
} else {
- return new Iterable<Map.Entry<UK, UV>>() {
- @Override
- public Iterator<Map.Entry<UK, UV>> iterator() {
- return iterator;
- }
- };
+ return () -> iterator;
}
}
@Override
- public Iterable<UK> keys() throws IOException, RocksDBException {
+ public Iterable<UK> keys() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
- return new Iterable<UK>() {
+ return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
@Override
- public Iterator<UK> iterator() {
- return new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
- @Override
- public UK next() {
- RocksDBMapEntry entry = nextEntry();
- return (entry == null ? null : entry.getKey());
- }
- };
+ public UK next() {
+ RocksDBMapEntry entry = nextEntry();
+ return (entry == null ? null : entry.getKey());
}
};
}
@Override
- public Iterable<UV> values() throws IOException, RocksDBException {
+ public Iterable<UV> values() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
- return new Iterable<UV>() {
+ return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
@Override
- public Iterator<UV> iterator() {
- return new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
- @Override
- public UV next() {
- RocksDBMapEntry entry = nextEntry();
- return (entry == null ? null : entry.getValue());
- }
- };
+ public UV next() {
+ RocksDBMapEntry entry = nextEntry();
+ return (entry == null ? null : entry.getValue());
}
};
}
@Override
- public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, RocksDBException {
+ public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
@@ -305,12 +294,7 @@ public class RocksDBMapState<K, N, UK, UV>
return null;
}
- return KvStateSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() {
- @Override
- public Iterator<Map.Entry<UK, UV>> iterator() {
- return iterator;
- }
- }, dupUserKeySerializer, dupUserValueSerializer);
+ return KvStateSerializer.serializeMap(() -> iterator, dupUserKeySerializer, dupUserValueSerializer);
}
// ------------------------------------------------------------------------
@@ -416,7 +400,7 @@ public class RocksDBMapState<K, N, UK, UV>
RocksDBMapEntry(
@Nonnull final RocksDB db,
- @Nonnull final int userKeyOffset,
+ @Nonnegative final int userKeyOffset,
@Nonnull final byte[] rawKeyBytes,
@Nonnull final byte[] rawValueBytes,
@Nonnull final TypeSerializer<UK> keySerializer,
@@ -628,4 +612,17 @@ public class RocksDBMapState<K, N, UK, UV>
}
}
}
+
+ @SuppressWarnings("unchecked")
+ static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ RocksDBKeyedStateBackend<K> backend) {
+ return (IS) new RocksDBMapState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ (TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),
+ (Map<UK, UV>) stateDesc.getDefaultValue(),
+ backend);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index ead40b2..d138045 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -20,9 +20,14 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -38,9 +43,9 @@ import java.util.Collection;
* @param <N> The type of the namespace.
* @param <V> The type of value that the state state stores.
*/
-public class RocksDBReducingState<K, N, V>
- extends AbstractRocksDBAppendingState<K, N, V, V, V, ReducingState<V>>
- implements InternalReducingState<K, N, V> {
+class RocksDBReducingState<K, N, V>
+ extends AbstractRocksDBAppendingState<K, N, V, V, V, ReducingState<V>>
+ implements InternalReducingState<K, N, V> {
/** User-specified reduce function. */
private final ReduceFunction<V> reduceFunction;
@@ -55,7 +60,7 @@ public class RocksDBReducingState<K, N, V>
* @param reduceFunction The reduce function used for reducing state.
* @param backend The backend for which this state is bind to.
*/
- public RocksDBReducingState(ColumnFamilyHandle columnFamily,
+ private RocksDBReducingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<V> valueSerializer,
V defaultValue,
@@ -163,4 +168,18 @@ public class RocksDBReducingState<K, N, V>
throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
}
}
+
+ @SuppressWarnings("unchecked")
+ static <K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ RocksDBKeyedStateBackend<K> backend) {
+ return (IS) new RocksDBReducingState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ ((ReducingStateDescriptor<SV>) stateDesc).getReduceFunction(),
+ backend);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 92e9a34..2b60fc1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -18,10 +18,14 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -38,9 +42,9 @@ import java.io.IOException;
* @param <N> The type of the namespace.
* @param <V> The type of value that the state state stores.
*/
-public class RocksDBValueState<K, N, V>
- extends AbstractRocksDBState<K, N, V, ValueState<V>>
- implements InternalValueState<K, N, V> {
+class RocksDBValueState<K, N, V>
+ extends AbstractRocksDBState<K, N, V, ValueState<V>>
+ implements InternalValueState<K, N, V> {
/**
* Creates a new {@code RocksDBValueState}.
@@ -51,7 +55,7 @@ public class RocksDBValueState<K, N, V>
* @param defaultValue The default value for the state.
* @param backend The backend for which this state is bind to.
*/
- public RocksDBValueState(
+ private RocksDBValueState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<V> valueSerializer,
@@ -92,7 +96,7 @@ public class RocksDBValueState<K, N, V>
}
@Override
- public void update(V value) throws IOException {
+ public void update(V value) {
if (value == null) {
clear();
return;
@@ -108,4 +112,17 @@ public class RocksDBValueState<K, N, V>
throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
}
}
+
+ @SuppressWarnings("unchecked")
+ static <K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ RocksDBKeyedStateBackend<K> backend) {
+ return (IS) new RocksDBValueState<>(
+ registerResult.f0,
+ registerResult.f1.getNamespaceSerializer(),
+ registerResult.f1.getStateSerializer(),
+ stateDesc.getDefaultValue(),
+ backend);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
index 86faea3..6b079d6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.contrib.streaming.state.benchmark;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBMapState;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.util.TestLogger;
@@ -36,7 +35,6 @@ import org.rocksdb.WriteOptions;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/**
* Test that validates that the performance of RocksDB's WriteBatch as expected.
@@ -63,7 +61,7 @@ import java.util.Map;
* restoring from savepoint, because we need to set disableWAL=true to avoid segfault bug, see FLINK-8859 for detail).
*
* <p>Write gives user 1.5x performance improvements when disableWAL is true, this is useful for batch writing scenario,
- * e.g. {@link RocksDBMapState#putAll(Map)} & {@link RocksDBMapState#clear()}.
+ * e.g. RocksDBMapState.putAll(Map) & RocksDBMapState.clear().
*/
public class RocksDBWriteBatchPerformanceTest extends TestLogger {
[2/2] flink git commit: [FLINK-9571] Repace StateBinder with internal
backend-specific state factories
Posted by sr...@apache.org.
[FLINK-9571] Repace StateBinder with internal backend-specific state factories
This closes #6173.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bdde837
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bdde837
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bdde837
Branch: refs/heads/master
Commit: 0bdde8377c254195fe94709d639bf03f9bd77606
Parents: 0e9b066
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Wed Jun 13 10:24:10 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jun 18 14:41:50 2018 +0200
----------------------------------------------------------------------
.../state/AggregatingStateDescriptor.java | 7 -
.../common/state/FoldingStateDescriptor.java | 7 -
.../api/common/state/ListStateDescriptor.java | 7 -
.../api/common/state/MapStateDescriptor.java | 5 -
.../common/state/ReducingStateDescriptor.java | 7 -
.../flink/api/common/state/StateBinder.java | 85 ----------
.../flink/api/common/state/StateDescriptor.java | 10 +-
.../api/common/state/ValueStateDescriptor.java | 7 -
.../api/common/state/StateDescriptorTest.java | 10 --
.../client/QueryableStateClient.java | 57 ++++++-
.../client/state/ImmutableAggregatingState.java | 22 +--
.../client/state/ImmutableFoldingState.java | 18 +-
.../client/state/ImmutableListState.java | 28 ++--
.../client/state/ImmutableMapState.java | 31 ++--
.../client/state/ImmutableReducingState.java | 18 +-
.../client/state/ImmutableStateBinder.java | 80 ---------
.../client/state/ImmutableValueState.java | 18 +-
.../state/ImmutableAggregatingStateTest.java | 7 +-
.../client/state/ImmutableFoldingStateTest.java | 7 +-
.../client/state/ImmutableListStateTest.java | 9 +-
.../client/state/ImmutableMapStateTest.java | 19 ++-
.../state/ImmutableReducingStateTest.java | 7 +-
.../client/state/ImmutableValueStateTest.java | 8 +-
.../KVStateRequestSerializerRocksDBTest.java | 50 +-----
.../network/KvStateRequestSerializerTest.java | 6 +-
.../state/AbstractKeyedStateBackend.java | 164 +++----------------
.../state/heap/HeapAggregatingState.java | 24 ++-
.../runtime/state/heap/HeapFoldingState.java | 20 ++-
.../state/heap/HeapKeyedStateBackend.java | 125 ++++----------
.../flink/runtime/state/heap/HeapListState.java | 18 +-
.../flink/runtime/state/heap/HeapMapState.java | 21 ++-
.../runtime/state/heap/HeapReducingState.java | 31 +++-
.../runtime/state/heap/HeapValueState.java | 17 +-
.../runtime/state/StateBackendTestBase.java | 8 +-
.../state/StateSnapshotCompressionTest.java | 6 +-
...pKeyedStateBackendSnapshotMigrationTest.java | 6 +-
.../state/RocksDBAggregatingState.java | 27 ++-
.../streaming/state/RocksDBFoldingState.java | 27 ++-
.../state/RocksDBKeyedStateBackend.java | 132 ++++-----------
.../streaming/state/RocksDBListState.java | 35 +++-
.../streaming/state/RocksDBMapState.java | 75 ++++-----
.../streaming/state/RocksDBReducingState.java | 27 ++-
.../streaming/state/RocksDBValueState.java | 27 ++-
.../RocksDBWriteBatchPerformanceTest.java | 4 +-
44 files changed, 510 insertions(+), 814 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index 8c7fed6..1197ed2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -93,13 +93,6 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
this.aggFunction = checkNotNull(aggFunction);
}
- // ------------------------------------------------------------------------
-
- @Override
- public AggregatingState<IN, OUT> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createAggregatingState(this);
- }
-
/**
* Returns the aggregate function to be used for the state.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index c14e4bf..392c04c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -97,13 +97,6 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
}
}
- // ------------------------------------------------------------------------
-
- @Override
- public FoldingState<T, ACC> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createFoldingState(this);
- }
-
/**
* Returns the fold function to be used for the folding state.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index aa5e64b..0016c22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -76,13 +76,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
super(name, new ListSerializer<>(typeSerializer), null);
}
- // ------------------------------------------------------------------------
-
- @Override
- public ListState<T> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createListState(this);
- }
-
/**
* Gets the serializer for the elements contained in the list.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 42b016a..6eb8ddc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -81,11 +81,6 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
}
@Override
- public MapState<UK, UV> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createMapState(this);
- }
-
- @Override
public Type getType() {
return Type.MAP;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 0df1c2c..07b22c9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -83,13 +83,6 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
this.reduceFunction = checkNotNull(reduceFunction);
}
- // ------------------------------------------------------------------------
-
- @Override
- public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createReducingState(this);
- }
-
/**
* Returns the reduce function to be used for the reducing state.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
deleted file mode 100644
index 871b4a8..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual
- * {@link State} objects.
- */
-@Internal
-public interface StateBinder {
-
- /**
- * Creates and returns a new {@link ValueState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <T> The type of the value that the {@code ValueState} can store.
- */
- <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link ListState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <T> The type of the values that the {@code ListState} can store.
- */
- <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link ReducingState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <T> The type of the values that the {@code ReducingState} can store.
- */
- <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link AggregatingState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <IN> The type of the values that go into the aggregating state
- * @param <ACC> The type of the values that are stored in the aggregating state
- * @param <OUT> The type of the values that come out of the aggregating state
- */
- <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
- AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link FoldingState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <T> Type of the values folded into the state
- * @param <ACC> Type of the value in the state
- *
- * @deprecated will be removed in a future version in favor of {@link AggregatingState}
- */
- @Deprecated
- <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link MapState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <MK> Type of the keys in the state
- * @param <MV> Type of the values in the state
- */
- <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 9b6b51d..6c54e71 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -41,8 +41,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
- * {@link State} in stateful operations. This contains the name and can create an actual state
- * object given a {@link StateBinder} using {@link #bind(StateBinder)}.
+ * {@link State} in stateful operations.
*
* <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
*
@@ -231,13 +230,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
return queryableStateName != null;
}
- /**
- * Creates a new {@link State} on the given {@link StateBinder}.
- *
- * @param stateBinder The {@code StateBackend} on which to create the {@link State}.
- */
- public abstract S bind(StateBinder stateBinder) throws Exception;
-
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 4d69d81..d2719aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -122,13 +122,6 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
super(name, typeSerializer, null);
}
- // ------------------------------------------------------------------------
-
- @Override
- public ValueState<T> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createValueState(this);
- }
-
@Override
public Type getType() {
return Type.VALUE;
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index 3958baa..4346163 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -221,11 +221,6 @@ public class StateDescriptorTest {
}
@Override
- public State bind(StateBinder stateBinder) throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
public Type getType() {
return Type.VALUE;
}
@@ -248,11 +243,6 @@ public class StateDescriptorTest {
}
@Override
- public State bind(StateBinder stateBinder) throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
public Type getType() {
return Type.VALUE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 2a6baf0..470c7ac 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -21,13 +21,25 @@ package org.apache.flink.queryablestate.client;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.FutureUtils;
-import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
+import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
+import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
+import org.apache.flink.queryablestate.client.state.ImmutableListState;
+import org.apache.flink.queryablestate.client.state.ImmutableMapState;
+import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
+import org.apache.flink.queryablestate.client.state.ImmutableValueState;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
@@ -44,7 +56,10 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Client for querying Flink's managed state.
@@ -67,6 +82,20 @@ public class QueryableStateClient {
private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
+ private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+ Stream.of(
+ Tuple2.of(ValueStateDescriptor.class, (StateFactory) ImmutableValueState::createState),
+ Tuple2.of(ListStateDescriptor.class, (StateFactory) ImmutableListState::createState),
+ Tuple2.of(MapStateDescriptor.class, (StateFactory) ImmutableMapState::createState),
+ Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) ImmutableAggregatingState::createState),
+ Tuple2.of(ReducingStateDescriptor.class, (StateFactory) ImmutableReducingState::createState),
+ Tuple2.of(FoldingStateDescriptor.class, (StateFactory) ImmutableFoldingState::createState)
+ ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+ private interface StateFactory {
+ <T, S extends State> S createState(StateDescriptor<S, T> stateDesc, byte[] serializedState) throws Exception;
+ }
+
/** The client that forwards the requests to the proxy. */
private final Client<KvStateRequest, KvStateResponse> client;
@@ -241,14 +270,24 @@ public class QueryableStateClient {
return FutureUtils.getFailedFuture(e);
}
- return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
- stateResponse -> {
- try {
- return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
- } catch (Exception e) {
- throw new FlinkRuntimeException(e);
- }
- });
+ return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
+ .thenApply(stateResponse -> createState(stateResponse, stateDescriptor));
+ }
+
+ private <T, S extends State> S createState(
+ KvStateResponse stateResponse,
+ StateDescriptor<S, T> stateDescriptor) {
+ StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not supported by %s",
+ stateDescriptor.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+ try {
+ return stateFactory.createState(stateDescriptor, stateResponse.getContent());
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
index 8964fbf..a83da54 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -33,7 +34,6 @@ import java.io.IOException;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link AggregatingStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> {
private final OUT value;
@@ -57,15 +57,15 @@ public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState imp
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState(
- final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor,
- final byte[] serializedValue) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <OUT, ACC, S extends State> S createState(
+ StateDescriptor<S, ACC> stateDescriptor,
+ byte[] serializedState) throws IOException {
final ACC accumulator = KvStateSerializer.deserializeValue(
- serializedValue,
- stateDescriptor.getSerializer());
-
- final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator);
- return new ImmutableAggregatingState<>(state);
+ serializedState,
+ stateDescriptor.getSerializer());
+ final OUT state = ((AggregatingStateDescriptor<?, ACC, OUT>) stateDescriptor).
+ getAggregateFunction().getResult(accumulator);
+ return (S) new ImmutableAggregatingState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
index 25f3118..16a94c6 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -33,7 +34,6 @@ import java.io.IOException;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link FoldingStateDescriptor}.
*/
-@PublicEvolving
@Deprecated
public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> {
@@ -58,13 +58,13 @@ public final class ImmutableFoldingState<IN, ACC> extends ImmutableState impleme
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
- final FoldingStateDescriptor<IN, ACC> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <ACC, S extends State> S createState(
+ StateDescriptor<S, ACC> stateDescriptor,
+ byte[] serializedState) throws IOException {
final ACC state = KvStateSerializer.deserializeValue(
- serializedState,
- stateDescriptor.getSerializer());
- return new ImmutableFoldingState<>(state);
+ serializedState,
+ stateDescriptor.getSerializer());
+ return (S) new ImmutableFoldingState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
index 9f1465e..0c86ecf 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -34,7 +35,6 @@ import java.util.List;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link ListStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableListState<V> extends ImmutableState implements ListState<V> {
private final List<V> listState;
@@ -58,23 +58,23 @@ public final class ImmutableListState<V> extends ImmutableState implements ListS
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <V> ImmutableListState<V> createState(
- final ListStateDescriptor<V> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
- final List<V> state = KvStateSerializer.deserializeList(
- serializedState,
- stateDescriptor.getElementSerializer());
- return new ImmutableListState<>(state);
- }
-
@Override
- public void update(List<V> values) throws Exception {
+ public void update(List<V> values) {
throw MODIFICATION_ATTEMPT_ERROR;
}
@Override
- public void addAll(List<V> values) throws Exception {
+ public void addAll(List<V> values) {
throw MODIFICATION_ATTEMPT_ERROR;
}
+
+ @SuppressWarnings("unchecked")
+ public static <V, T, S extends State> S createState(
+ StateDescriptor<S, T> stateDescriptor,
+ byte[] serializedState) throws IOException {
+ final List<V> state = KvStateSerializer.deserializeList(
+ serializedState,
+ ((ListStateDescriptor<V>) stateDescriptor).getElementSerializer());
+ return (S) new ImmutableListState<>(state);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index bb08cf0..4d51b7d 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -38,7 +39,6 @@ import java.util.Set;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link MapStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableMapState<K, V> extends ImmutableState implements MapState<K, V> {
private final Map<K, V> state;
@@ -76,8 +76,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
* Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}.
*
* @return A read-only iterable view of all the key-value pairs in the state.
- *
- * @throws Exception Thrown if the system cannot access the state.
*/
@Override
public Iterable<Map.Entry<K, V>> entries() {
@@ -88,8 +86,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
* Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}.
*
* @return A read-only iterable view of all the keys in the state.
- *
- * @throws Exception Thrown if the system cannot access the state.
*/
@Override
public Iterable<K> keys() {
@@ -100,8 +96,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
* Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}.
*
* @return A read-only iterable view of all the values in the state.
- *
- * @throws Exception Thrown if the system cannot access the state.
*/
@Override
public Iterable<V> values() {
@@ -112,9 +106,7 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
* Iterates over all the mappings in the state. The iterator cannot
* remove elements.
*
- * @return A read-only iterator over all the mappings in the state
- *
- * @throws Exception Thrown if the system cannot access the state.
+ * @return A read-only iterator over all the mappings in the state.
*/
@Override
public Iterator<Map.Entry<K, V>> iterator() {
@@ -126,14 +118,15 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <K, V> ImmutableMapState<K, V> createState(
- final MapStateDescriptor<K, V> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <K, V, T, S extends State> S createState(
+ StateDescriptor<S, T> stateDescriptor,
+ byte[] serializedState) throws IOException {
+ MapStateDescriptor<K, V> mapStateDescriptor = (MapStateDescriptor<K, V>) stateDescriptor;
final Map<K, V> state = KvStateSerializer.deserializeMap(
- serializedState,
- stateDescriptor.getKeySerializer(),
- stateDescriptor.getValueSerializer());
- return new ImmutableMapState<>(state);
+ serializedState,
+ mapStateDescriptor.getKeySerializer(),
+ mapStateDescriptor.getValueSerializer());
+ return (S) new ImmutableMapState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
index 46b477f..a9990b0 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -33,7 +34,6 @@ import java.io.IOException;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link ReducingStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableReducingState<V> extends ImmutableState implements ReducingState<V> {
private final V value;
@@ -57,13 +57,13 @@ public final class ImmutableReducingState<V> extends ImmutableState implements R
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <V> ImmutableReducingState<V> createState(
- final ReducingStateDescriptor<V> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <V, S extends State> S createState(
+ StateDescriptor<S, V> stateDescriptor,
+ byte[] serializedState) throws IOException {
final V state = KvStateSerializer.deserializeValue(
- serializedState,
- stateDescriptor.getSerializer());
- return new ImmutableReducingState<>(state);
+ serializedState,
+ stateDescriptor.getSerializer());
+ return (S) new ImmutableReducingState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
deleted file mode 100644
index 6ce2787..0000000
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateBinder;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link StateBinder} used to deserialize the results returned by the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient}.
- *
- * <p>The result is an immutable {@link org.apache.flink.api.common.state.State State}
- * object containing the requested result.
- */
-public class ImmutableStateBinder implements StateBinder {
-
- private final byte[] serializedState;
-
- public ImmutableStateBinder(final byte[] content) {
- serializedState = Preconditions.checkNotNull(content);
- }
-
- @Override
- public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
- return ImmutableValueState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
- return ImmutableListState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
- return ImmutableReducingState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception {
- return ImmutableAggregatingState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- return ImmutableFoldingState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception {
- return ImmutableMapState.createState(stateDesc, serializedState);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
index f3ddd2b..3a5cab4 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
@@ -18,7 +18,8 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -33,7 +34,6 @@ import java.io.IOException;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link ValueStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableValueState<V> extends ImmutableState implements ValueState<V> {
private final V value;
@@ -57,13 +57,13 @@ public final class ImmutableValueState<V> extends ImmutableState implements Valu
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <V> ImmutableValueState<V> createState(
- final ValueStateDescriptor<V> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <V, S extends State> S createState(
+ StateDescriptor<S, V> stateDescriptor,
+ byte[] serializedState) throws IOException {
final V state = KvStateSerializer.deserializeValue(
- serializedState,
- stateDescriptor.getSerializer());
- return new ImmutableValueState<>(state);
+ serializedState,
+ stateDescriptor.getSerializer());
+ return (S) new ImmutableValueState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
index ebbc896..955bf38 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -41,7 +42,7 @@ public class ImmutableAggregatingStateTest {
new SumAggr(),
String.class);
- private ImmutableAggregatingState<Long, String> aggrState;
+ private AggregatingState<Long, String> aggrState;
@Before
public void setUp() throws Exception {
@@ -61,7 +62,7 @@ public class ImmutableAggregatingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
String value = aggrState.get();
assertEquals("42", value);
@@ -69,7 +70,7 @@ public class ImmutableAggregatingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
String value = aggrState.get();
assertEquals("42", value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
index 9e8dfc9..2803d23 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -43,7 +44,7 @@ public class ImmutableFoldingStateTest {
new SumFold(),
StringSerializer.INSTANCE);
- private ImmutableFoldingState<Long, String> foldingState;
+ private FoldingState<Long, String> foldingState;
@Before
public void setUp() throws Exception {
@@ -61,7 +62,7 @@ public class ImmutableFoldingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
String value = foldingState.get();
assertEquals("42", value);
@@ -69,7 +70,7 @@ public class ImmutableFoldingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
String value = foldingState.get();
assertEquals("42", value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
index a78ed1f..19b8514 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,7 +43,7 @@ public class ImmutableListStateTest {
private final ListStateDescriptor<Long> listStateDesc =
new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
- private ImmutableListState<Long> listState;
+ private ListState<Long> listState;
@Before
public void setUp() throws Exception {
@@ -58,7 +59,7 @@ public class ImmutableListStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
List<Long> list = getStateContents();
assertEquals(1L, list.size());
@@ -69,7 +70,7 @@ public class ImmutableListStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
List<Long> list = getStateContents();
assertEquals(1L, list.size());
@@ -100,7 +101,7 @@ public class ImmutableListStateTest {
return baos.toByteArray();
}
- private List<Long> getStateContents() {
+ private List<Long> getStateContents() throws Exception {
List<Long> list = new ArrayList<>();
for (Long elem: listState.get()) {
list.add(elem);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
index ffeabae..6465257 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -44,7 +45,7 @@ public class ImmutableMapStateTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO);
- private ImmutableMapState<Long, Long> mapState;
+ private MapState<Long, Long> mapState;
@Before
public void setUp() throws Exception {
@@ -65,7 +66,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testPut() {
+ public void testPut() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -78,7 +79,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testPutAll() {
+ public void testPutAll() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -95,7 +96,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -108,7 +109,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testIterator() {
+ public void testIterator() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -124,7 +125,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testIterable() {
+ public void testIterable() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -142,7 +143,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testKeys() {
+ public void testKeys() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -158,7 +159,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testValues() {
+ public void testValues() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -174,7 +175,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
index 9694f55..543f714 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -38,7 +39,7 @@ public class ImmutableReducingStateTest {
private final ReducingStateDescriptor<Long> reducingStateDesc =
new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
- private ImmutableReducingState<Long> reduceState;
+ private ReducingState<Long> reduceState;
@Before
public void setUp() throws Exception {
@@ -53,7 +54,7 @@ public class ImmutableReducingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
long value = reduceState.get();
assertEquals(42L, value);
@@ -61,7 +62,7 @@ public class ImmutableReducingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
long value = reduceState.get();
assertEquals(42L, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
index a0da43d..75f8f03 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
@@ -19,12 +19,14 @@
package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
@@ -37,7 +39,7 @@ public class ImmutableValueStateTest {
private final ValueStateDescriptor<Long> valueStateDesc =
new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
- private ImmutableValueState<Long> valueState;
+ private ValueState<Long> valueState;
@Before
public void setUp() throws Exception {
@@ -52,7 +54,7 @@ public class ImmutableValueStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws IOException {
long value = valueState.value();
assertEquals(42L, value);
@@ -60,7 +62,7 @@ public class ImmutableValueStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws IOException {
long value = valueState.value();
assertEquals(42L, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 6ee7631..a49fdd2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.queryablestate.network;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
@@ -40,8 +39,6 @@ import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import java.io.File;
-
import static org.mockito.Mockito.mock;
/**
@@ -54,43 +51,6 @@ public final class KVStateRequestSerializerRocksDBTest {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
- * Extension of {@link RocksDBKeyedStateBackend} to make {@link
- * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
- * the tests.
- *
- * @param <K> key type
- */
- static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
-
- RocksDBKeyedStateBackend2(
- final String operatorIdentifier,
- final ClassLoader userCodeClassLoader,
- final File instanceBasePath,
- final DBOptions dbOptions,
- final ColumnFamilyOptions columnFamilyOptions,
- final TaskKvStateRegistry kvStateRegistry,
- final TypeSerializer<K> keySerializer,
- final int numberOfKeyGroups,
- final KeyGroupRange keyGroupRange,
- final ExecutionConfig executionConfig) throws Exception {
-
- super(operatorIdentifier, userCodeClassLoader,
- instanceBasePath,
- dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
- numberOfKeyGroups, keyGroupRange, executionConfig, false,
- TestLocalRecoveryConfig.disabled());
- }
-
- @Override
- public <N, T> InternalListState<K, N, T> createListState(
- final TypeSerializer<N> namespaceSerializer,
- final ListStateDescriptor<T> stateDesc) throws Exception {
-
- return super.createListState(namespaceSerializer, stateDesc);
- }
- }
-
- /**
* Tests list serialization and deserialization match.
*
* @see KvStateRequestSerializerTest#testListSerialization()
@@ -105,8 +65,8 @@ public final class KVStateRequestSerializerRocksDBTest {
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);
ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
- final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
- new RocksDBKeyedStateBackend2<>(
+ final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+ new RocksDBKeyedStateBackend<>(
"no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
@@ -115,13 +75,13 @@ public final class KVStateRequestSerializerRocksDBTest {
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1, new KeyGroupRange(0, 0),
- new ExecutionConfig()
+ new ExecutionConfig(), false,
+ TestLocalRecoveryConfig.disabled()
);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend
- .createListState(VoidNamespaceSerializer.INSTANCE,
+ final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
KvStateRequestSerializerTest.testListSerialization(key, listState);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 1dc7186..2ba7507 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -200,9 +200,9 @@ public class KvStateRequestSerializerTest {
);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
- VoidNamespaceSerializer.INSTANCE,
- new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+ final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(
+ VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
testListSerialization(key, listState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index f873655..1690240 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -20,32 +20,13 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBinder;
import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
@@ -73,30 +54,30 @@ public abstract class AbstractKeyedStateBackend<K> implements
protected final TypeSerializer<K> keySerializer;
/** The currently active key. */
- protected K currentKey;
+ private K currentKey;
- /** The key group of the currently active key */
+ /** The key group of the currently active key. */
private int currentKeyGroup;
/** So that we can give out state when the user uses the same key. */
- protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+ private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
- /** For caching the last accessed partitioned state */
+ /** For caching the last accessed partitioned state. */
private String lastName;
@SuppressWarnings("rawtypes")
private InternalKvState lastState;
- /** The number of key-groups aka max parallelism */
+ /** The number of key-groups aka max parallelism. */
protected final int numberOfKeyGroups;
- /** Range of key-groups for which this backend is responsible */
+ /** Range of key-groups for which this backend is responsible. */
protected final KeyGroupRange keyGroupRange;
- /** KvStateRegistry helper for this task */
+ /** KvStateRegistry helper for this task. */
protected final TaskKvStateRegistry kvStateRegistry;
- /** Registry for all opened streams, so they can be closed if the task using this backend is closed */
+ /** Registry for all opened streams, so they can be closed if the task using this backend is closed. */
protected CloseableRegistry cancelStreamRegistry;
protected final ClassLoader userCodeClassLoader;
@@ -153,87 +134,19 @@ public abstract class AbstractKeyedStateBackend<K> implements
}
/**
- * Creates and returns a new {@link ValueState}.
+ * Creates and returns a new {@link State}.
*
* @param namespaceSerializer TypeSerializer for the state namespace.
* @param stateDesc The {@code StateDescriptor} that contains the name of the state.
*
* @param <N> The type of the namespace.
- * @param <T> The type of the value that the {@code ValueState} can store.
+ * @param <SV> The type of the stored state value.
+ * @param <S> The type of the public API state.
+ * @param <IS> The type of internal state.
*/
- protected abstract <N, T> InternalValueState<K, N, T> createValueState(
- TypeSerializer<N> namespaceSerializer,
- ValueStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link ListState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that the {@code ListState} can store.
- */
- protected abstract <N, T> InternalListState<K, N, T> createListState(
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link ReducingState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that the {@code ListState} can store.
- */
- protected abstract <N, T> InternalReducingState<K, N, T> createReducingState(
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link AggregatingState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that the {@code ListState} can store.
- */
- protected abstract <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
- TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link FoldingState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <T> Type of the values folded into the state
- * @param <ACC> Type of the value in the state
- *
- * @deprecated will be removed in a future version
- */
- @Deprecated
- protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
- TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link MapState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <UK> Type of the keys in the state
- * @param <UV> Type of the values in the state *
- */
- protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
- TypeSerializer<N> namespaceSerializer,
- MapStateDescriptor<UK, UV> stateDesc) throws Exception;
+ public abstract <N, SV, S extends State, IS extends S> IS createState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDesc) throws Exception;
/**
* @see KeyedStateBackend
@@ -311,8 +224,6 @@ public abstract class AbstractKeyedStateBackend<K> implements
throw new RuntimeException(e);
}
});
- } catch (RuntimeException e) {
- throw e;
}
}
@@ -320,6 +231,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @see KeyedStateBackend
*/
@Override
+ @SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
@@ -343,43 +255,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
return typedState;
}
- // create a new blank key/value state
- S state = stateDescriptor.bind(new StateBinder() {
- @Override
- public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <T, ACC, R> AggregatingState<T, R> createAggregatingState(
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
- }
-
- });
-
- @SuppressWarnings("unchecked")
- InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
+ InternalKvState<K, N, ?> kvState = createState(namespaceSerializer, stateDescriptor);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
// Publish queryable state
@@ -392,14 +268,14 @@ public abstract class AbstractKeyedStateBackend<K> implements
kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
}
- return state;
+ return (S) kvState;
}
/**
* TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
* This method should be removed for the sake of namespaces being lazily fetched from the keyed
* state backend, or being set on the state directly.
- *
+ *
* @see KeyedStateBackend
*/
@SuppressWarnings("unchecked")
@@ -445,7 +321,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
}
@VisibleForTesting
- public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
+ StreamCompressionDecorator getKeyGroupCompressionDecorator() {
return keyGroupCompressionDecorator;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 97d1ad0..62aa30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
@@ -40,7 +43,6 @@ import java.io.IOException;
class HeapAggregatingState<K, N, IN, ACC, OUT>
extends AbstractHeapMergingState<K, N, IN, ACC, OUT>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {
-
private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
/**
@@ -53,7 +55,7 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
* @param defaultValue The default value for the state.
* @param aggregateFunction The aggregating function used for aggregating state.
*/
- HeapAggregatingState(
+ private HeapAggregatingState(
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<ACC> valueSerializer,
@@ -111,7 +113,7 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
// ------------------------------------------------------------------------
@Override
- protected ACC mergeState(ACC a, ACC b) throws Exception {
+ protected ACC mergeState(ACC a, ACC b) {
return aggregateTransformation.aggFunction.merge(a, b);
}
@@ -124,11 +126,25 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
}
@Override
- public ACC apply(ACC accumulator, IN value) throws Exception {
+ public ACC apply(ACC accumulator, IN value) {
if (accumulator == null) {
accumulator = aggFunction.createAccumulator();
}
return aggFunction.add(value, accumulator);
}
}
+
+ @SuppressWarnings("unchecked")
+ static <T, K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) {
+ return (IS) new HeapAggregatingState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue(),
+ ((AggregatingStateDescriptor<T, SV, ?>) stateDesc).getAggregateFunction());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index d61c4c5..1bca719 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -41,7 +44,6 @@ import java.io.IOException;
class HeapFoldingState<K, N, T, ACC>
extends AbstractHeapAppendingState<K, N, T, ACC, ACC>
implements InternalFoldingState<K, N, T, ACC> {
-
/** The function used to fold the state. */
private final FoldTransformation foldTransformation;
@@ -55,7 +57,7 @@ class HeapFoldingState<K, N, T, ACC>
* @param defaultValue The default value for the state.
* @param foldFunction The fold function used for folding state.
*/
- HeapFoldingState(
+ private HeapFoldingState(
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<ACC> valueSerializer,
@@ -118,4 +120,18 @@ class HeapFoldingState<K, N, T, ACC>
return foldFunction.fold((previousState != null) ? previousState : getDefaultValue(), value);
}
}
+
+ @SuppressWarnings("unchecked")
+ static <K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) {
+ return (IS) new HeapFoldingState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue(),
+ ((FoldingStateDescriptor<SV, SV>) stateDesc).getFoldFunction());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 76479b0..82ce584 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -59,12 +59,7 @@ import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
@@ -97,6 +92,23 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
+ private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+ Stream.of(
+ Tuple2.of(ValueStateDescriptor.class, (StateFactory) HeapValueState::create),
+ Tuple2.of(ListStateDescriptor.class, (StateFactory) HeapListState::create),
+ Tuple2.of(MapStateDescriptor.class, (StateFactory) HeapMapState::create),
+ Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) HeapAggregatingState::create),
+ Tuple2.of(ReducingStateDescriptor.class, (StateFactory) HeapReducingState::create),
+ Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
+ ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+ private interface StateFactory {
+ <K, N, SV, S extends State, IS extends S> IS createState(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) throws Exception;
+ }
+
/**
* Map of state tables that stores all state of key/value states. We store it centrally so
* that we can easily checkpoint/restore it.
@@ -110,8 +122,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Map of state names to their corresponding restored state meta info.
*
- * <p>
- * TODO this map can be removed when eager-state registration is in place.
+ * <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
@@ -203,91 +214,17 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, V> InternalValueState<K, N, V> createValueState(
- TypeSerializer<N> namespaceSerializer,
- ValueStateDescriptor<V> stateDesc) throws Exception {
-
- StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapValueState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
- }
-
- @Override
- public <N, T> InternalListState<K, N, T> createListState(
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<T> stateDesc) throws Exception {
-
- StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapListState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
- }
-
- @Override
- public <N, T> InternalReducingState<K, N, T> createReducingState(
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<T> stateDesc) throws Exception {
-
- StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapReducingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getReduceFunction());
- }
-
- @Override
- public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
- TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
- StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapAggregatingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getAggregateFunction());
- }
-
- @Override
- public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
- TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
- StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapFoldingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getFoldFunction());
- }
-
- @Override
- protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
- TypeSerializer<N> namespaceSerializer,
- MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
- StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-
- return new HeapMapState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
+ public <N, SV, S extends State, IS extends S> IS createState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDesc) throws Exception {
+ StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not supported by %s",
+ stateDesc.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+ StateTable<K, N, SV> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+ return stateFactory.createState(stateDesc, stateTable, keySerializer);
}
@Override
@@ -411,7 +348,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int writtenKeyGroupIndex = inView.readInt();
try (InputStream kgCompressionInStream =
- streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
+ streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
DataInputViewStreamWrapper kgCompressionInView =
new DataInputViewStreamWrapper(kgCompressionInStream);