You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/03/29 15:59:41 UTC
[1/5] flink git commit: [FLINK-8908] Do not create copy when
MapSerializer stateless.
Repository: flink
Updated Branches:
refs/heads/master 98924f0ab -> db8e1f09b
[FLINK-8908] Do not create copy when MapSerializer stateless.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eea887b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eea887b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eea887b7
Branch: refs/heads/master
Commit: eea887b7d9a3cd43416feca568c9815d8362e8d4
Parents: 98924f0
Author: kkloudas <kk...@gmail.com>
Authored: Fri Mar 9 12:05:38 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Mar 29 17:32:51 2018 +0200
----------------------------------------------------------------------
.../apache/flink/api/common/typeutils/base/MapSerializer.java | 4 +++-
.../java/org/apache/flink/runtime/state/HashMapSerializer.java | 4 +++-
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eea887b7/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index 23b494b..6471152 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -94,7 +94,9 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
- return new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
+ return (duplicateKeySerializer == keySerializer) && (duplicateValueSerializer == valueSerializer)
+ ? this
+ : new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/eea887b7/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index 066684b..c1b6346 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -95,7 +95,9 @@ public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>>
TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
- return new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
+ return (duplicateKeySerializer == keySerializer) && (duplicateValueSerializer == valueSerializer)
+ ? this
+ : new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
}
@Override
[4/5] flink git commit: [FLINK-8802] [QS] Fix concurrent access to
non-duplicated serializers.
Posted by kk...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 717711d..41df1b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -905,7 +905,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, String> kvState = (InternalKvState<Integer, VoidNamespace, String>) state;
// this is only available after the backend initialized the serializer
TypeSerializer<String> valueSerializer = kvId.getSerializer();
@@ -955,7 +955,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ InternalKvState<Integer, VoidNamespace, String> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, String>) restored1;
backend.setCurrentKey(1);
assertEquals("1", restored1.value());
@@ -971,7 +971,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ InternalKvState<Integer, VoidNamespace, String> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, String>) restored2;
backend.setCurrentKey(1);
assertEquals("u1", restored2.value());
@@ -987,7 +987,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
/**
- * Tests {@link ValueState#value()} and {@link InternalKvState#getSerializedValue(byte[])}
+ * Tests {@link ValueState#value()} and
+ * {@link InternalKvState#getSerializedValue(byte[], TypeSerializer, TypeSerializer, TypeSerializer)}
* accessing the state concurrently. They should not get in the way of each
* other.
*/
@@ -1012,7 +1013,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
final TypeSerializer<String> valueSerializer = kvId.getSerializer();
@SuppressWarnings("unchecked")
- final InternalKvState<Integer> kvState = (InternalKvState<Integer>) state;
+ final InternalKvState<Integer, Integer, String> kvState = (InternalKvState<Integer, Integer, String>) state;
/**
* 1) Test that ValueState#value() before and after
@@ -1232,7 +1233,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, String> kvState = (InternalKvState<Integer, VoidNamespace, String>) state;
// this is only available after the backend initialized the serializer
TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
@@ -1290,7 +1291,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ InternalKvState<Integer, VoidNamespace, String> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, String>) restored1;
backend.setCurrentKey(1);
assertEquals("1", joiner.join(restored1.get()));
@@ -1307,7 +1308,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ InternalKvState<Integer, VoidNamespace, String> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, String>) restored2;
backend.setCurrentKey(1);
assertEquals("1,u1", joiner.join(restored2.get()));
@@ -1560,8 +1561,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
final Integer namespace3 = 3;
try {
- InternalListState<Integer, Long> state =
- (InternalListState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+ InternalListState<String, Integer, Long> state =
+ (InternalListState<String, Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
// populate the different namespaces
// - abc spreads the values over three namespaces
@@ -1676,7 +1677,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, String> kvState = (InternalKvState<Integer, VoidNamespace, String>) state;
// this is only available after the backend initialized the serializer
TypeSerializer<String> valueSerializer = kvId.getSerializer();
@@ -1726,7 +1727,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ InternalKvState<Integer, VoidNamespace, String> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, String>) restored1;
backend.setCurrentKey(1);
assertEquals("1", restored1.get());
@@ -1742,7 +1743,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ InternalKvState<Integer, VoidNamespace, String> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, String>) restored2;
backend.setCurrentKey(1);
assertEquals("1,u1", restored2.get());
@@ -1827,8 +1828,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
try {
- final InternalReducingState<Integer, Long> state =
- (InternalReducingState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+ final InternalReducingState<String, Integer, Long> state =
+ (InternalReducingState<String, Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
// populate the different namespaces
// - abc spreads the values over three namespaces
@@ -2000,8 +2001,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
try {
- InternalAggregatingState<Integer, Long, Long> state =
- (InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+ InternalAggregatingState<String, Integer, Long, Long, Long> state =
+ (InternalAggregatingState<String, Integer, Long, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
// populate the different namespaces
// - abc spreads the values over three namespaces
@@ -2173,8 +2174,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
try {
- InternalAggregatingState<Integer, Long, Long> state =
- (InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+ InternalAggregatingState<String, Integer, Long, Long, Long> state =
+ (InternalAggregatingState<String, Integer, Long, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
// populate the different namespaces
// - abc spreads the values over three namespaces
@@ -2292,7 +2293,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, String> kvState = (InternalKvState<Integer, VoidNamespace, String>) state;
// this is only available after the backend initialized the serializer
TypeSerializer<String> valueSerializer = kvId.getSerializer();
@@ -2343,7 +2344,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ InternalKvState<Integer, VoidNamespace, String> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, String>) restored1;
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,1", restored1.get());
@@ -2360,7 +2361,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
@SuppressWarnings("unchecked")
FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ InternalKvState<Integer, VoidNamespace, String> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, String>) restored2;
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,101", restored2.get());
@@ -2388,7 +2389,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, Map<Integer, String>> kvState = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) state;
// these are only available after the backend initialized the serializer
TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
@@ -2491,7 +2492,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ InternalKvState<Integer, VoidNamespace, Map<Integer, String>> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) restored1;
backend.setCurrentKey(1);
assertEquals("1", restored1.get(1));
@@ -2510,7 +2511,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
@SuppressWarnings("unchecked")
MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ InternalKvState<Integer, VoidNamespace, Map<Integer, String>> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) restored2;
backend.setCurrentKey(1);
assertEquals("101", restored2.get(1));
@@ -3066,7 +3067,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, Integer> kvState = (InternalKvState<Integer, VoidNamespace, Integer>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3088,7 +3089,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, Integer> kvState = (InternalKvState<Integer, VoidNamespace, Integer>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3115,7 +3116,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, Integer> kvState = (InternalKvState<Integer, VoidNamespace, Integer>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3142,7 +3143,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, Integer> kvState = (InternalKvState<Integer, VoidNamespace, Integer>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3163,7 +3164,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ InternalKvState<Integer, VoidNamespace, Map<Integer, String>> kvState = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -3325,7 +3326,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
// insert some data to the backend.
- InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3382,7 +3383,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
try {
backend = createKeyedBackend(IntSerializer.INSTANCE);
- InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3429,7 +3430,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
try {
backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle);
- InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3544,7 +3545,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
return;
}
- InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createValueState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3598,7 +3599,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
* if it is not null.
*/
protected static <V, K, N> V getSerializedValue(
- InternalKvState<N> kvState,
+ InternalKvState<K, N, V> kvState,
K key,
TypeSerializer<K> keySerializer,
N namespace,
@@ -3608,7 +3609,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
key, keySerializer, namespace, namespaceSerializer);
- byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+ byte[] serializedValue = kvState.getSerializedValue(
+ serializedKeyAndNamespace,
+ kvState.getKeySerializer(),
+ kvState.getNamespaceSerializer(),
+ kvState.getValueSerializer()
+ );
if (serializedValue == null) {
return null;
@@ -3622,7 +3628,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
* if it is not null.
*/
private static <V, K, N> List<V> getSerializedList(
- InternalKvState<N> kvState,
+ InternalKvState<K, N, V> kvState,
K key,
TypeSerializer<K> keySerializer,
N namespace,
@@ -3632,7 +3638,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
key, keySerializer, namespace, namespaceSerializer);
- byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+ byte[] serializedValue = kvState.getSerializedValue(
+ serializedKeyAndNamespace,
+ kvState.getKeySerializer(),
+ kvState.getNamespaceSerializer(),
+ kvState.getValueSerializer()
+ );
if (serializedValue == null) {
return null;
@@ -3646,7 +3657,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
* if it is not null.
*/
private static <UK, UV, K, N> Map<UK, UV> getSerializedMap(
- InternalKvState<N> kvState,
+ InternalKvState<K, N, Map<UK, UV>> kvState,
K key,
TypeSerializer<K> keySerializer,
N namespace,
@@ -3658,7 +3669,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
key, keySerializer, namespace, namespaceSerializer);
- byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+ byte[] serializedValue = kvState.getSerializedValue(
+ serializedKeyAndNamespace,
+ kvState.getKeySerializer(),
+ kvState.getNamespaceSerializer(),
+ kvState.getValueSerializer()
+ );
if (serializedValue == null) {
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 7d903cc..e646c64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -119,7 +119,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
try {
- InternalValueState<VoidNamespace, String> state =
+ InternalValueState<String, VoidNamespace, String> state =
stateBackend.createValueState(
new VoidNamespaceSerializer(),
stateDescriptor);
@@ -163,7 +163,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
stateBackend.restore(StateObjectCollection.singleton(stateHandle));
- InternalValueState<VoidNamespace, String> state = stateBackend.createValueState(
+ InternalValueState<String, VoidNamespace, String> state = stateBackend.createValueState(
new VoidNamespaceSerializer(),
stateDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index d83aa9f..815ceae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -69,7 +69,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
- InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+ InternalListState<String, Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
assertEquals(7, keyedBackend.numStateEntries());
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 8915261..8c59979 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -43,11 +43,11 @@ import java.io.IOException;
*
* @param <K> The type of the key.
* @param <N> The type of the namespace.
+ * @param <V> The type of values kept internally in state.
* @param <S> The type of {@link State}.
* @param <SD> The type of {@link StateDescriptor}.
*/
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V>
- implements InternalKvState<N>, State {
+public abstract class AbstractRocksDBState<K, N, V, S extends State, SD extends StateDescriptor<S, V>> implements InternalKvState<K, N, V>, State {
/** Serializer for the namespace. */
final TypeSerializer<N> namespaceSerializer;
@@ -114,25 +114,36 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
}
@Override
- @SuppressWarnings("unchecked")
- public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
- Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+ public byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<K> safeKeySerializer,
+ final TypeSerializer<N> safeNamespaceSerializer,
+ final TypeSerializer<V> safeValueSerializer) throws Exception {
+
+ Preconditions.checkNotNull(serializedKeyAndNamespace);
+ Preconditions.checkNotNull(safeKeySerializer);
+ Preconditions.checkNotNull(safeNamespaceSerializer);
+ Preconditions.checkNotNull(safeValueSerializer);
//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
- Tuple2<K, N> des = KvStateSerializer.<K, N>deserializeKeyAndNamespace(
- serializedKeyAndNamespace,
- backend.getKeySerializer(),
- namespaceSerializer);
+ Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
- int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
// we cannot reuse the keySerializationStream member since this method
// is called concurrently to the other ones and it may thus contain garbage
ByteArrayOutputStreamWithPos tmpKeySerializationStream = new ByteArrayOutputStreamWithPos(128);
DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = new DataOutputViewStreamWrapper(tmpKeySerializationStream);
- writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
- tmpKeySerializationStream, tmpKeySerializationDateDataOutputView);
+ writeKeyWithGroupAndNamespace(
+ keyGroup,
+ keyAndNamespace.f0,
+ safeKeySerializer,
+ keyAndNamespace.f1,
+ safeNamespaceSerializer,
+ tmpKeySerializationStream,
+ tmpKeySerializationDateDataOutputView);
return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray());
}
@@ -151,11 +162,32 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDataOutputView) throws IOException {
+ writeKeyWithGroupAndNamespace(
+ keyGroup,
+ key,
+ backend.getKeySerializer(),
+ namespace,
+ namespaceSerializer,
+ keySerializationStream,
+ keySerializationDataOutputView);
+ }
+
+ protected void writeKeyWithGroupAndNamespace(
+ final int keyGroup,
+ final K key,
+ final TypeSerializer<K> keySerializer,
+ final N namespace,
+ final TypeSerializer<N> namespaceSerializer,
+ final ByteArrayOutputStreamWithPos keySerializationStream,
+ final DataOutputView keySerializationDataOutputView) throws IOException {
+
Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
+ Preconditions.checkNotNull(keySerializer);
+ Preconditions.checkNotNull(namespaceSerializer);
keySerializationStream.reset();
RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
- RocksDBKeySerializationUtils.writeKey(key, backend.getKeySerializer(), keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
+ RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 4dfc772..14671a5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -43,8 +43,8 @@ import java.util.Collection;
* @param <R> The type of the value returned from the state
*/
public class RocksDBAggregatingState<K, N, T, ACC, R>
- extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC>
- implements InternalAggregatingState<N, T, R> {
+ extends AbstractRocksDBState<K, N, ACC, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>>
+ implements InternalAggregatingState<K, N, T, ACC, R> {
/** Serializer for the values. */
private final TypeSerializer<ACC> valueSerializer;
@@ -73,6 +73,21 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
}
@Override
+ public TypeSerializer<K> getKeySerializer() {
+ return backend.getKeySerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<ACC> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
+ @Override
public R get() throws IOException {
try {
// prepare the current key and namespace for RocksDB lookup
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index d886f44..6f5baff 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -44,8 +44,8 @@ import java.io.IOException;
*/
@Deprecated
public class RocksDBFoldingState<K, N, T, ACC>
- extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
- implements InternalFoldingState<N, T, ACC> {
+ extends AbstractRocksDBState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
+ implements InternalFoldingState<K, N, T, ACC> {
/** Serializer for the values. */
private final TypeSerializer<ACC> valueSerializer;
@@ -72,6 +72,21 @@ public class RocksDBFoldingState<K, N, T, ACC>
}
@Override
+ public TypeSerializer<K> getKeySerializer() {
+ return backend.getKeySerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<ACC> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
+ @Override
public ACC get() {
try {
writeCurrentKeyWithGroupAndNamespace();
@@ -108,5 +123,4 @@ public class RocksDBFoldingState<K, N, T, ACC>
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 41b7bd0..6a23181 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1203,7 +1203,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, T> InternalValueState<N, T> createValueState(
+ protected <N, T> InternalValueState<K, N, T> createValueState(
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc) throws Exception {
@@ -1213,7 +1213,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, T> InternalListState<N, T> createListState(
+ protected <N, T> InternalListState<K, N, T> createListState(
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
@@ -1223,7 +1223,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, T> InternalReducingState<N, T> createReducingState(
+ protected <N, T> InternalReducingState<K, N, T> createReducingState(
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {
@@ -1233,7 +1233,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+ protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
TypeSerializer<N> namespaceSerializer,
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
@@ -1242,7 +1242,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+ protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
@@ -1252,7 +1252,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+ protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 62c169b..d9bcb6a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -47,8 +47,8 @@ import java.util.List;
* @param <V> The type of the values in the list state.
*/
public class RocksDBListState<K, N, V>
- extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>>
- implements InternalListState<N, V> {
+ extends AbstractRocksDBState<K, N, List<V>, ListState<V>, ListStateDescriptor<V>>
+ implements InternalListState<K, N, V> {
/** Serializer for the values. */
private final TypeSerializer<V> valueSerializer;
@@ -75,6 +75,21 @@ public class RocksDBListState<K, N, V>
}
@Override
+ public TypeSerializer<K> getKeySerializer() {
+ return backend.getKeySerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<List<V>> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
+ @Override
public Iterable<V> get() {
try {
writeCurrentKeyWithGroupAndNamespace();
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index af789ac..c75a2ed 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -52,14 +53,14 @@ import java.util.Map;
* {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
* we use the {@code merge()} call.
*
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
* @param <UK> The type of the keys in the map state.
* @param <UV> The type of the values in the map state.
*/
public class RocksDBMapState<K, N, UK, UV>
- extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>>
- implements InternalMapState<N, UK, UV> {
+ extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
+ implements InternalMapState<K, N, UK, UV, Map<UK, UV>> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
@@ -76,7 +77,8 @@ public class RocksDBMapState<K, N, UK, UV>
* @param namespaceSerializer The serializer for the namespace.
* @param stateDesc The state identifier for the state.
*/
- public RocksDBMapState(ColumnFamilyHandle columnFamily,
+ public RocksDBMapState(
+ ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc,
RocksDBKeyedStateBackend<K> backend) {
@@ -87,6 +89,21 @@ public class RocksDBMapState<K, N, UK, UV>
this.userValueSerializer = stateDesc.getValueSerializer();
}
+ @Override
+ public TypeSerializer<K> getKeySerializer() {
+ return backend.getKeySerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<Map<UK, UV>> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
// ------------------------------------------------------------------------
// MapState Implementation
// ------------------------------------------------------------------------
@@ -158,7 +175,7 @@ public class RocksDBMapState<K, N, UK, UV>
return new Iterable<UK>() {
@Override
public Iterator<UK> iterator() {
- return new RocksDBMapIterator<UK>(backend.db, prefixBytes) {
+ return new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
@Override
public UK next() {
RocksDBMapEntry entry = nextEntry();
@@ -176,7 +193,7 @@ public class RocksDBMapState<K, N, UK, UV>
return new Iterable<UV>() {
@Override
public Iterator<UV> iterator() {
- return new RocksDBMapIterator<UV>(backend.db, prefixBytes) {
+ return new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
@Override
public UV next() {
RocksDBMapEntry entry = nextEntry();
@@ -191,7 +208,7 @@ public class RocksDBMapState<K, N, UK, UV>
public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, RocksDBException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
- return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes) {
+ return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
@Override
public Map.Entry<UK, UV> next() {
return nextEntry();
@@ -215,23 +232,48 @@ public class RocksDBMapState<K, N, UK, UV>
@Override
@SuppressWarnings("unchecked")
- public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
- Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+ public byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<K> safeKeySerializer,
+ final TypeSerializer<N> safeNamespaceSerializer,
+ final TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
+
+ Preconditions.checkNotNull(serializedKeyAndNamespace);
+ Preconditions.checkNotNull(safeKeySerializer);
+ Preconditions.checkNotNull(safeNamespaceSerializer);
+ Preconditions.checkNotNull(safeValueSerializer);
//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
- Tuple2<K, N> des = KvStateSerializer.deserializeKeyAndNamespace(
- serializedKeyAndNamespace,
- backend.getKeySerializer(),
- namespaceSerializer);
+ Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
- int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
- writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, outputStream, outputView);
+
+ writeKeyWithGroupAndNamespace(
+ keyGroup,
+ keyAndNamespace.f0,
+ safeKeySerializer,
+ keyAndNamespace.f1,
+ safeNamespaceSerializer,
+ outputStream,
+ outputView);
+
final byte[] keyPrefixBytes = outputStream.toByteArray();
- final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, keyPrefixBytes) {
+ final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
+
+ final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
+ final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
+
+ final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(
+ backend.db,
+ keyPrefixBytes,
+ dupUserKeySerializer,
+ dupUserValueSerializer) {
+
@Override
public Map.Entry<UK, UV> next() {
return nextEntry();
@@ -248,7 +290,7 @@ public class RocksDBMapState<K, N, UK, UV>
public Iterator<Map.Entry<UK, UV>> iterator() {
return iterator;
}
- }, userKeySerializer, userValueSerializer);
+ }, dupUserKeySerializer, dupUserValueSerializer);
}
// ------------------------------------------------------------------------
@@ -270,34 +312,42 @@ public class RocksDBMapState<K, N, UK, UV>
}
private byte[] serializeUserValue(UV userValue) throws IOException {
+ return serializeUserValue(userValue, userValueSerializer);
+ }
+
+ private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
+ return deserializeUserValue(rawValueBytes, userValueSerializer);
+ }
+
+ private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
keySerializationStream.reset();
if (userValue == null) {
keySerializationDataOutputView.writeBoolean(true);
} else {
keySerializationDataOutputView.writeBoolean(false);
- userValueSerializer.serialize(userValue, keySerializationDataOutputView);
+ valueSerializer.serialize(userValue, keySerializationDataOutputView);
}
return keySerializationStream.toByteArray();
}
- private UK deserializeUserKey(byte[] rawKeyBytes) throws IOException {
+ private UK deserializeUserKey(byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
in.skipBytes(userKeyOffset);
- return userKeySerializer.deserialize(in);
+ return keySerializer.deserialize(in);
}
- private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
+ private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
boolean isNull = in.readBoolean();
- return isNull ? null : userValueSerializer.deserialize(in);
+ return isNull ? null : valueSerializer.deserialize(in);
}
// ------------------------------------------------------------------------
@@ -320,15 +370,25 @@ public class RocksDBMapState<K, N, UK, UV>
/** The user key and value. The deserialization is performed lazily, i.e. the key
* and the value is deserialized only when they are accessed. */
- private UK userKey = null;
- private UV userValue = null;
+ private UK userKey;
+
+ private UV userValue;
+
+ private TypeSerializer<UK> keySerializer;
+
+ private TypeSerializer<UV> valueSerializer;
RocksDBMapEntry(
- @Nonnull final RocksDB db,
- @Nonnull final byte[] rawKeyBytes,
- @Nonnull final byte[] rawValueBytes) {
+ @Nonnull final RocksDB db,
+ @Nonnull final byte[] rawKeyBytes,
+ @Nonnull final byte[] rawValueBytes,
+ @Nonnull final TypeSerializer<UK> keySerializer,
+ @Nonnull final TypeSerializer<UV> valueSerializer) {
this.db = db;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+
this.rawKeyBytes = rawKeyBytes;
this.rawValueBytes = rawValueBytes;
this.deleted = false;
@@ -349,9 +409,9 @@ public class RocksDBMapState<K, N, UK, UV>
public UK getKey() {
if (userKey == null) {
try {
- userKey = deserializeUserKey(rawKeyBytes);
+ userKey = deserializeUserKey(rawKeyBytes, keySerializer);
} catch (IOException e) {
- throw new RuntimeException("Error while deserializing the user key.");
+ throw new RuntimeException("Error while deserializing the user key.", e);
}
}
@@ -365,9 +425,9 @@ public class RocksDBMapState<K, N, UK, UV>
} else {
if (userValue == null) {
try {
- userValue = deserializeUserValue(rawValueBytes);
+ userValue = deserializeUserValue(rawValueBytes, valueSerializer);
} catch (IOException e) {
- throw new RuntimeException("Error while deserializing the user value.");
+ throw new RuntimeException("Error while deserializing the user value.", e);
}
}
@@ -385,7 +445,7 @@ public class RocksDBMapState<K, N, UK, UV>
try {
userValue = value;
- rawValueBytes = serializeUserValue(value);
+ rawValueBytes = serializeUserValue(value, valueSerializer);
db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
} catch (IOException | RocksDBException e) {
@@ -399,14 +459,14 @@ public class RocksDBMapState<K, N, UK, UV>
/** An auxiliary utility to scan all entries under the given key. */
private abstract class RocksDBMapIterator<T> implements Iterator<T> {
- static final int CACHE_SIZE_LIMIT = 128;
+ private static final int CACHE_SIZE_LIMIT = 128;
/** The db where data resides. */
private final RocksDB db;
/**
* The prefix bytes of the key being accessed. All entries under the same key
- * has the same prefix, hence we can stop the iterating once coming across an
+ * have the same prefix, hence we can stop iterating once coming across an
* entry with a different prefix.
*/
private final byte[] keyPrefixBytes;
@@ -421,9 +481,19 @@ public class RocksDBMapState<K, N, UK, UV>
private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
private int cacheIndex = 0;
- RocksDBMapIterator(final RocksDB db, final byte[] keyPrefixBytes) {
+ private final TypeSerializer<UK> keySerializer;
+ private final TypeSerializer<UV> valueSerializer;
+
+ RocksDBMapIterator(
+ final RocksDB db,
+ final byte[] keyPrefixBytes,
+ final TypeSerializer<UK> keySerializer,
+ final TypeSerializer<UV> valueSerializer) {
+
this.db = db;
this.keyPrefixBytes = keyPrefixBytes;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
}
@Override
@@ -436,7 +506,7 @@ public class RocksDBMapState<K, N, UK, UV>
@Override
public void remove() {
if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
- throw new IllegalStateException("The remove operation must be called after an valid next operation.");
+ throw new IllegalStateException("The remove operation must be called after a valid next operation.");
}
RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
@@ -503,7 +573,13 @@ public class RocksDBMapState<K, N, UK, UV>
break;
}
- RocksDBMapEntry entry = new RocksDBMapEntry(db, iterator.key(), iterator.value());
+ RocksDBMapEntry entry = new RocksDBMapEntry(
+ db,
+ iterator.key(),
+ iterator.value(),
+ keySerializer,
+ valueSerializer);
+
cacheEntries.add(entry);
iterator.next();
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 2a7f6e0..bef5e40 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -42,8 +42,8 @@ import java.util.Collection;
* @param <V> The type of value that the state state stores.
*/
public class RocksDBReducingState<K, N, V>
- extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, V>
- implements InternalReducingState<N, V> {
+ extends AbstractRocksDBState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>>
+ implements InternalReducingState<K, N, V> {
/** Serializer for the values. */
private final TypeSerializer<V> valueSerializer;
@@ -69,6 +69,21 @@ public class RocksDBReducingState<K, N, V>
}
@Override
+ public TypeSerializer<K> getKeySerializer() {
+ return backend.getKeySerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<V> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
+ @Override
public V get() {
try {
writeCurrentKeyWithGroupAndNamespace();
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 99718be..6d7fb56 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -39,8 +39,8 @@ import java.io.IOException;
* @param <V> The type of value that the state state stores.
*/
public class RocksDBValueState<K, N, V>
- extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, V>
- implements InternalValueState<N, V> {
+ extends AbstractRocksDBState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
+ implements InternalValueState<K, N, V> {
/** Serializer for the values. */
private final TypeSerializer<V> valueSerializer;
@@ -62,6 +62,21 @@ public class RocksDBValueState<K, N, V>
}
@Override
+ public TypeSerializer<K> getKeySerializer() {
+ return backend.getKeySerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<V> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
+ @Override
public V value() {
try {
writeCurrentKeyWithGroupAndNamespace();
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index fd01446..65cf167 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -76,7 +76,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
private transient EvictorContext evictorContext;
- private transient InternalListState<W, StreamRecord<IN>> evictingWindowState;
+ private transient InternalListState<K, W, StreamRecord<IN>> evictingWindowState;
// ------------------------------------------------------------------------
@@ -428,7 +428,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
super.open();
evictorContext = new EvictorContext(null, null);
- evictingWindowState = (InternalListState<W, StreamRecord<IN>>)
+ evictingWindowState = (InternalListState<K, W, StreamRecord<IN>>)
getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index aa19168..9e3898c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -144,16 +144,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// ------------------------------------------------------------------------
/** The state in which the window contents is stored. Each window is a namespace */
- private transient InternalAppendingState<W, IN, ACC> windowState;
+ private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;
/**
* The {@link #windowState}, typed to merging state for merging windows.
* Null if the window state is not mergeable.
*/
- private transient InternalMergingState<W, IN, ACC> windowMergingState;
+ private transient InternalMergingState<K, W, IN, ACC, ACC> windowMergingState;
/** The state that holds the merging window metadata (the sets that describe what is merged). */
- private transient InternalListState<VoidNamespace, Tuple2<W, W>> mergingSetsState;
+ private transient InternalListState<K, VoidNamespace, Tuple2<W, W>> mergingSetsState;
/**
* This is given to the {@code InternalWindowFunction} for emitting elements with a given
@@ -234,7 +234,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// create (or restore) the state that hold the actual window contents
// NOTE - the state may be null in the case of the overriding evicting window operator
if (windowStateDescriptor != null) {
- windowState = (InternalAppendingState<W, IN, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
+ windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}
// create the typed and helper states for merging windows
@@ -242,7 +242,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// store a typed reference for the state of merging windows - sanity check
if (windowState instanceof InternalMergingState) {
- windowMergingState = (InternalMergingState<W, IN, ACC>) windowState;
+ windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState;
}
// TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
// TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
@@ -263,7 +263,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
new ListStateDescriptor<>("merging-window-set", tupleSerializer);
// get the state that stores the merging sets
- mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>)
+ mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>)
getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
}
@@ -883,7 +883,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (rawState instanceof InternalMergingState) {
@SuppressWarnings("unchecked")
- InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState;
+ InternalMergingState<K, W, ?, ?, ?> mergingState = (InternalMergingState<K, W, ?, ?, ?>) rawState;
mergingState.mergeNamespaces(window, mergedWindows);
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 5aa47e8..1536956 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -379,7 +379,7 @@ public class TriggerTestHarness<T, W extends Window> {
if (rawState instanceof InternalMergingState) {
@SuppressWarnings("unchecked")
- InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState;
+ InternalMergingState<K, W, ?, ?, ?> mergingState = (InternalMergingState<K, W, ?, ?, ?>) rawState;
mergingState.mergeNamespaces(window, mergedWindows);
}
else {
[5/5] flink git commit: [FLINK-8802] [QS] Fix concurrent access to
non-duplicated serializers.
Posted by kk...@apache.org.
[FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers.
This closes #5691.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db8e1f09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db8e1f09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db8e1f09
Branch: refs/heads/master
Commit: db8e1f09bd7dcd9f392bf987e96cddcb34665b6c
Parents: c16e2c9
Author: kkloudas <kk...@gmail.com>
Authored: Fri Mar 9 22:47:35 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Mar 29 17:35:39 2018 +0200
----------------------------------------------------------------------
.../flink/queryablestate/network/Client.java | 2 +-
.../server/KvStateServerHandler.java | 26 ++-
.../queryablestate/network/ClientTest.java | 4 +-
.../KVStateRequestSerializerRocksDBTest.java | 16 +-
.../network/KvStateRequestSerializerTest.java | 30 ++-
.../network/KvStateServerHandlerTest.java | 46 +++-
.../flink/runtime/query/KvStateEntry.java | 75 +++++++
.../apache/flink/runtime/query/KvStateInfo.java | 114 ++++++++++
.../flink/runtime/query/KvStateRegistry.java | 21 +-
.../runtime/query/TaskKvStateRegistry.java | 4 +-
.../state/AbstractKeyedStateBackend.java | 23 +-
.../state/heap/AbstractHeapMergingState.java | 9 +-
.../runtime/state/heap/AbstractHeapState.java | 31 ++-
.../state/heap/HeapAggregatingState.java | 19 +-
.../runtime/state/heap/HeapFoldingState.java | 20 +-
.../state/heap/HeapKeyedStateBackend.java | 16 +-
.../flink/runtime/state/heap/HeapListState.java | 47 +++-
.../flink/runtime/state/heap/HeapMapState.java | 55 +++--
.../runtime/state/heap/HeapReducingState.java | 20 +-
.../runtime/state/heap/HeapValueState.java | 20 +-
.../internal/InternalAggregatingState.java | 13 +-
.../state/internal/InternalAppendingState.java | 12 +-
.../state/internal/InternalFoldingState.java | 5 +-
.../runtime/state/internal/InternalKvState.java | 37 +++-
.../state/internal/InternalListState.java | 6 +-
.../state/internal/InternalMapState.java | 5 +-
.../state/internal/InternalMergingState.java | 12 +-
.../state/internal/InternalReducingState.java | 5 +-
.../state/internal/InternalValueState.java | 5 +-
.../runtime/query/KvStateRegistryTest.java | 212 ++++++++++++++++++-
.../runtime/state/StateBackendTestBase.java | 96 +++++----
.../state/StateSnapshotCompressionTest.java | 4 +-
...pKeyedStateBackendSnapshotMigrationTest.java | 2 +-
.../streaming/state/AbstractRocksDBState.java | 58 +++--
.../state/RocksDBAggregatingState.java | 19 +-
.../streaming/state/RocksDBFoldingState.java | 20 +-
.../state/RocksDBKeyedStateBackend.java | 12 +-
.../streaming/state/RocksDBListState.java | 19 +-
.../streaming/state/RocksDBMapState.java | 152 +++++++++----
.../streaming/state/RocksDBReducingState.java | 19 +-
.../streaming/state/RocksDBValueState.java | 19 +-
.../windowing/EvictingWindowOperator.java | 4 +-
.../operators/windowing/WindowOperator.java | 14 +-
.../operators/windowing/TriggerTestHarness.java | 2 +-
44 files changed, 1074 insertions(+), 276 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 364f835..6b60492 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -281,7 +281,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
* @param request the request to be sent.
* @return Future holding the serialized result
*/
- public CompletableFuture<RESP> sendRequest(REQ request) {
+ CompletableFuture<RESP> sendRequest(REQ request) {
synchronized (connectLock) {
if (failureCause != null) {
return FutureUtils.getFailedFuture(failureCause);
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 18a2944..d46deff 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -26,6 +26,8 @@ import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateEntry;
+import org.apache.flink.runtime.query.KvStateInfo;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.ExceptionUtils;
@@ -33,9 +35,6 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.concurrent.CompletableFuture;
/**
@@ -50,8 +49,6 @@ import java.util.concurrent.CompletableFuture;
@ChannelHandler.Sharable
public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
- private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
-
/** KvState registry holding references to the KvState instances. */
private final KvStateRegistry registry;
@@ -78,13 +75,13 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
try {
- final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
+ final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
- byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
+ byte[] serializedResult = getSerializedValue(kvState, serializedKeyAndNamespace);
if (serializedResult != null) {
responseFuture.complete(new KvStateResponse(serializedResult));
} else {
@@ -100,6 +97,21 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
}
}
+ private static <K, N, V> byte[] getSerializedValue(
+ final KvStateEntry<K, N, V> entry,
+ final byte[] serializedKeyAndNamespace) throws Exception {
+
+ final InternalKvState<K, N, V> state = entry.getState();
+ final KvStateInfo<K, N, V> infoForCurrentThread = entry.getInfoForCurrentThread();
+
+ return state.getSerializedValue(
+ serializedKeyAndNamespace,
+ infoForCurrentThread.getKeySerializer(),
+ infoForCurrentThread.getNamespaceSerializer(),
+ infoForCurrentThread.getStateValueSerializer()
+ );
+ }
+
@Override
public CompletableFuture<Void> shutdown() {
return CompletableFuture.completedFuture(null);
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 6aa4710..bceb361 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -685,8 +685,8 @@ public class ClientTest {
state.update(201 + i);
- // we know it must be a KvStat but this is not exposed to the user via State
- InternalKvState<?> kvState = (InternalKvState<?>) state;
+ // we know it must be a KvState but this is not exposed to the user via State
+ InternalKvState<Integer, ?, Integer> kvState = (InternalKvState<Integer, ?, Integer>) state;
// Register KvState (one state instance for all server)
ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index dd75dd6..4985bf3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -41,6 +41,7 @@ import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import java.io.File;
+import java.util.Map;
import static org.mockito.Mockito.mock;
@@ -82,7 +83,7 @@ public final class KVStateRequestSerializerRocksDBTest {
}
@Override
- public <N, T> InternalListState<N, T> createListState(
+ public <N, T> InternalListState<K, N, T> createListState(
final TypeSerializer<N> namespaceSerializer,
final ListStateDescriptor<T> stateDesc) throws Exception {
@@ -120,7 +121,7 @@ public final class KVStateRequestSerializerRocksDBTest {
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
+ final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend
.createListState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
@@ -159,11 +160,12 @@ public final class KVStateRequestSerializerRocksDBTest {
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
- longHeapKeyedStateBackend.getPartitionedState(
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+ final InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>> mapState =
+ (InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>>)
+ longHeapKeyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
KvStateRequestSerializerTest.testMapSerialization(key, mapState);
longHeapKeyedStateBackend.dispose();
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 8d10141..dac1b90 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -200,7 +200,7 @@ public class KvStateRequestSerializerTest {
);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
+ final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
@@ -220,7 +220,7 @@ public class KvStateRequestSerializerTest {
*/
public static void testListSerialization(
final long key,
- final InternalListState<VoidNamespace, Long> listState) throws Exception {
+ final InternalListState<Long, VoidNamespace, Long> listState) throws Exception {
TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
listState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -240,7 +240,11 @@ public class KvStateRequestSerializerTest {
key, LongSerializer.INSTANCE,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
- final byte[] serializedValues = listState.getSerializedValue(serializedKey);
+ final byte[] serializedValues = listState.getSerializedValue(
+ serializedKey,
+ listState.getKeySerializer(),
+ listState.getNamespaceSerializer(),
+ listState.getValueSerializer());
List<Long> actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer);
assertEquals(expectedValues, actualValues);
@@ -303,10 +307,12 @@ public class KvStateRequestSerializerTest {
);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>) longHeapKeyedStateBackend.getPartitionedState(
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+ final InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>> mapState =
+ (InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>>)
+ longHeapKeyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
testMapSerialization(key, mapState);
}
@@ -322,9 +328,9 @@ public class KvStateRequestSerializerTest {
*
* @throws Exception
*/
- public static void testMapSerialization(
+ public static <M extends Map<Long, String>> void testMapSerialization(
final long key,
- final InternalMapState<VoidNamespace, Long, String> mapState) throws Exception {
+ final InternalMapState<Long, VoidNamespace, Long, String, M> mapState) throws Exception {
TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
@@ -348,7 +354,11 @@ public class KvStateRequestSerializerTest {
key, LongSerializer.INSTANCE,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
- final byte[] serializedValues = mapState.getSerializedValue(serializedKey);
+ final byte[] serializedValues = mapState.getSerializedValue(
+ serializedKey,
+ mapState.getKeySerializer(),
+ mapState.getNamespaceSerializer(),
+ mapState.getValueSerializer());
Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
assertEquals(expectedValues.size(), actualValues.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 8b1517c..9947dac 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.queryablestate.network;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.queryablestate.KvStateID;
@@ -70,9 +72,6 @@ import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests for {@link KvStateServerHandler}.
@@ -286,7 +285,7 @@ public class KvStateServerHandlerTest extends TestLogger {
}
/**
- * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} call.
+ * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[], TypeSerializer, TypeSerializer, TypeSerializer)} call.
*/
@Test
public void testFailureOnGetSerializedValue() throws Exception {
@@ -300,9 +299,42 @@ public class KvStateServerHandlerTest extends TestLogger {
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
// Failing KvState
- InternalKvState<?> kvState = mock(InternalKvState.class);
- when(kvState.getSerializedValue(any(byte[].class)))
- .thenThrow(new RuntimeException("Expected test Exception"));
+ InternalKvState<Integer, VoidNamespace, Long> kvState =
+ new InternalKvState<Integer, VoidNamespace, Long>() {
+ @Override
+ public TypeSerializer<Integer> getKeySerializer() {
+ return IntSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
+ return VoidNamespaceSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeSerializer<Long> getValueSerializer() {
+ return LongSerializer.INSTANCE;
+ }
+
+ @Override
+ public void setCurrentNamespace(VoidNamespace namespace) {
+ // do nothing
+ }
+
+ @Override
+ public byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<Integer> safeKeySerializer,
+ final TypeSerializer<VoidNamespace> safeNamespaceSerializer,
+ final TypeSerializer<Long> safeValueSerializer) throws Exception {
+ throw new RuntimeException("Expected test Exception");
+ }
+
+ @Override
+ public void clear() {
+
+ }
+ };
KvStateID kvStateId = registry.registerKvState(
new JobID(),
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
new file mode 100644
index 0000000..0bd132f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace the state is associated to
+ * @param <V> The type of values kept internally in state
+ */
+@Internal
+public class KvStateEntry<K, N, V> {
+
+ private final InternalKvState<K, N, V> state;
+ private final KvStateInfo<K, N, V> stateInfo;
+
+ private final boolean areSerializersStateless;
+
+ private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
+
+ public KvStateEntry(final InternalKvState<K, N, V> state) {
+ this.state = Preconditions.checkNotNull(state);
+ this.stateInfo = new KvStateInfo<>(
+ state.getKeySerializer(),
+ state.getNamespaceSerializer(),
+ state.getValueSerializer()
+ );
+ this.serializerCache = new ConcurrentHashMap<>();
+ this.areSerializersStateless = stateInfo.duplicate() == stateInfo;
+ }
+
+ public InternalKvState<K, N, V> getState() {
+ return state;
+ }
+
+ public KvStateInfo<K, N, V> getInfoForCurrentThread() {
+ return areSerializersStateless
+ ? stateInfo
+ : serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate());
+ }
+
+ public void clear() {
+ serializerCache.clear();
+ }
+
+ @VisibleForTesting
+ public int getCacheSize() {
+ return serializerCache.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
new file mode 100644
index 0000000..aa94e41
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Metadata about a {@link InternalKvState}. This includes the serializers for
+ * the key, the namespace, and the values kept in the state.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace the state is associated to
+ * @param <V> The type of values kept internally in state
+ */
+public class KvStateInfo<K, N, V> {
+
+ private final TypeSerializer<K> keySerializer;
+ private final TypeSerializer<N> namespaceSerializer;
+ private final TypeSerializer<V> stateValueSerializer;
+
+ public KvStateInfo(
+ final TypeSerializer<K> keySerializer,
+ final TypeSerializer<N> namespaceSerializer,
+ final TypeSerializer<V> stateValueSerializer
+ ) {
+ this.keySerializer = Preconditions.checkNotNull(keySerializer);
+ this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+ this.stateValueSerializer = Preconditions.checkNotNull(stateValueSerializer);
+ }
+
+ /**
+ * @return The serializer for the key the state is associated to.
+ */
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ /**
+ * @return The serializer for the namespace the state is associated to.
+ */
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ /**
+ * @return The serializer for the values kept in the state.
+ */
+ public TypeSerializer<V> getStateValueSerializer() {
+ return stateValueSerializer;
+ }
+
+ /**
+ * Creates a deep copy of the current {@link KvStateInfo} by duplicating
+ * all the included serializers.
+ *
+ * <p>This method assumes correct implementation of the {@link TypeSerializer#duplicate()}
+ * method of the included serializers.
+ */
+ public KvStateInfo<K, N, V> duplicate() {
+ final TypeSerializer<K> dupKeySerializer = keySerializer.duplicate();
+ final TypeSerializer<N> dupNamespaceSerializer = namespaceSerializer.duplicate();
+ final TypeSerializer<V> dupSVSerializer = stateValueSerializer.duplicate();
+
+ if (
+ dupKeySerializer == keySerializer &&
+ dupNamespaceSerializer == namespaceSerializer &&
+ dupSVSerializer == stateValueSerializer
+ ) {
+ return this;
+ }
+
+ return new KvStateInfo<>(dupKeySerializer, dupNamespaceSerializer, dupSVSerializer);
+
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KvStateInfo<?, ?, ?> stateInfo = (KvStateInfo<?, ?, ?>) o;
+ return Objects.equals(keySerializer, stateInfo.keySerializer) &&
+ Objects.equals(namespaceSerializer, stateInfo.namespaceSerializer) &&
+ Objects.equals(stateValueSerializer, stateInfo.stateValueSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keySerializer, namespaceSerializer, stateValueSerializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 2c55463..63d3c52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -41,8 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class KvStateRegistry {
/** All registered KvState instances. */
- private final ConcurrentHashMap<KvStateID, InternalKvState<?>> registeredKvStates =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<KvStateID, KvStateEntry<?, ?, ?>> registeredKvStates = new ConcurrentHashMap<>(4);
/** Registry listeners to be notified on registration/unregistration. */
private final ConcurrentHashMap<JobID, KvStateRegistryListener> listeners = new ConcurrentHashMap<>(4);
@@ -86,11 +85,11 @@ public class KvStateRegistry {
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName,
- InternalKvState<?> kvState) {
+ InternalKvState<?, ?, ?> kvState) {
KvStateID kvStateId = new KvStateID();
- if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
+ if (registeredKvStates.putIfAbsent(kvStateId, new KvStateEntry<>(kvState)) == null) {
final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
if (listener != null) {
@@ -123,7 +122,10 @@ public class KvStateRegistry {
String registrationName,
KvStateID kvStateId) {
- if (registeredKvStates.remove(kvStateId) != null) {
+ KvStateEntry<?, ?, ?> entry = registeredKvStates.remove(kvStateId);
+ if (entry != null) {
+ entry.clear();
+
final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
if (listener != null) {
listener.notifyKvStateUnregistered(
@@ -136,13 +138,13 @@ public class KvStateRegistry {
}
/**
- * Returns the KvState instance identified by the given KvStateID or
- * <code>null</code> if none is registered.
+ * Returns the {@link KvStateEntry} containing the requested instance as identified by the
+ * given KvStateID, along with its {@link KvStateInfo} or <code>null</code> if none is registered.
*
* @param kvStateId KvStateID to identify the KvState instance
- * @return KvState instance identified by the KvStateID or <code>null</code>
+ * @return The {@link KvStateEntry} instance identified by the KvStateID or <code>null</code> if there is none
*/
- public InternalKvState<?> getKvState(KvStateID kvStateId) {
+ public KvStateEntry<?, ?, ?> getKvState(KvStateID kvStateId) {
return registeredKvStates.get(kvStateId);
}
@@ -174,5 +176,4 @@ public class KvStateRegistry {
}
return listener;
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index f799b5a..a44a508 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -60,7 +60,7 @@ public class TaskKvStateRegistry {
* descriptor used to create the KvState instance)
* @param kvState The
*/
- public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?> kvState) {
+ public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?, ?, ?> kvState) {
KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState);
registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
}
@@ -85,7 +85,7 @@ public class TaskKvStateRegistry {
private final KvStateID kvStateId;
- public KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
+ KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
this.keyGroupRange = keyGroupRange;
this.registrationName = registrationName;
this.kvStateId = kvStateId;
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e7b3a1a..287474c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -53,6 +53,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Map;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -79,7 +80,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
private int currentKeyGroup;
/** So that we can give out state when the user uses the same key. */
- protected final HashMap<String, InternalKvState<?>> keyValueStatesByName;
+ protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
/** For caching the last accessed partitioned state */
private String lastName;
@@ -161,7 +162,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @param <N> The type of the namespace.
* @param <T> The type of the value that the {@code ValueState} can store.
*/
- protected abstract <N, T> InternalValueState<N, T> createValueState(
+ protected abstract <N, T> InternalValueState<K, N, T> createValueState(
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc) throws Exception;
@@ -174,7 +175,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @param <N> The type of the namespace.
* @param <T> The type of the values that the {@code ListState} can store.
*/
- protected abstract <N, T> InternalListState<N, T> createListState(
+ protected abstract <N, T> InternalListState<K, N, T> createListState(
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception;
@@ -187,7 +188,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @param <N> The type of the namespace.
* @param <T> The type of the values that the {@code ListState} can store.
*/
- protected abstract <N, T> InternalReducingState<N, T> createReducingState(
+ protected abstract <N, T> InternalReducingState<K, N, T> createReducingState(
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception;
@@ -200,7 +201,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @param <N> The type of the namespace.
* @param <T> The type of the values that the {@code ListState} can store.
*/
- protected abstract <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+ protected abstract <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
TypeSerializer<N> namespaceSerializer,
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
@@ -217,7 +218,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @deprecated will be removed in a future version
*/
@Deprecated
- protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+ protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
@@ -231,7 +232,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @param <UK> Type of the keys in the state
* @param <UV> Type of the values in the state *
*/
- protected abstract <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+ protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception;
@@ -336,7 +337,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
- InternalKvState<?> existing = keyValueStatesByName.get(stateDescriptor.getName());
+ InternalKvState<K, ?, ?> existing = keyValueStatesByName.get(stateDescriptor.getName());
if (existing != null) {
@SuppressWarnings("unchecked")
S typedState = (S) existing;
@@ -379,7 +380,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
});
@SuppressWarnings("unchecked")
- InternalKvState<N> kvState = (InternalKvState<N>) state;
+ InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
// Publish queryable state
@@ -416,7 +417,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
return (S) lastState;
}
- InternalKvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
+ InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
if (previous != null) {
lastState = previous;
lastState.setCurrentNamespace(namespace);
@@ -425,7 +426,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
}
final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
- final InternalKvState<N> kvState = (InternalKvState<N>) state;
+ final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
lastName = stateDescriptor.getName();
lastState = kvState;
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index 3e76423..df762b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -28,18 +28,19 @@ import org.apache.flink.runtime.state.internal.InternalMergingState;
import java.util.Collection;
/**
- * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
- * that is stored on the heap.
+ * Base class for {@link MergingState} ({@link InternalMergingState}) that is stored on the heap.
*
* @param <K> The type of the key.
* @param <N> The type of the namespace.
+ * @param <IN> The type of the input elements.
* @param <SV> The type of the values in the state.
+ * @param <OUT> The type of the output elements.
* @param <S> The type of State
* @param <SD> The type of StateDescriptor for the State S
*/
-public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends State, SD extends StateDescriptor<S, ?>>
+public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends State, SD extends StateDescriptor<S, SV>>
extends AbstractHeapState<K, N, SV, S, SD>
- implements InternalMergingState<N, IN, OUT> {
+ implements InternalMergingState<K, N, IN, SV, OUT> {
/**
* The merge transformation function that implements the merge logic.
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 7f629ae..e889c53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -37,8 +37,7 @@ import org.apache.flink.util.Preconditions;
* @param <S> The type of State
* @param <SD> The type of StateDescriptor for the State S
*/
-public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
- implements InternalKvState<N> {
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements InternalKvState<K, N, SV> {
/** Map containing the actual key/value pairs. */
protected final StateTable<K, N, SV> stateTable;
@@ -86,28 +85,26 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
}
@Override
- public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
- Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+ public byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<K> safeKeySerializer,
+ final TypeSerializer<N> safeNamespaceSerializer,
+ final TypeSerializer<SV> safeValueSerializer) throws Exception {
- Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
- serializedKeyAndNamespace, keySerializer, namespaceSerializer);
-
- return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1);
- }
+ Preconditions.checkNotNull(serializedKeyAndNamespace);
+ Preconditions.checkNotNull(safeKeySerializer);
+ Preconditions.checkNotNull(safeNamespaceSerializer);
+ Preconditions.checkNotNull(safeValueSerializer);
- public byte[] getSerializedValue(K key, N namespace) throws Exception {
- Preconditions.checkState(namespace != null, "No namespace given.");
- Preconditions.checkState(key != null, "No key given.");
+ Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
- SV result = stateTable.get(key, namespace);
+ SV result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
if (result == null) {
return null;
}
-
- @SuppressWarnings("unchecked,rawtypes")
- TypeSerializer serializer = stateDesc.getSerializer();
- return KvStateSerializer.serializeValue(result, serializer);
+ return KvStateSerializer.serializeValue(result, safeValueSerializer);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 6dd5cec..8e58ac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -39,8 +39,8 @@ import java.io.IOException;
* @param <OUT> The type of the value returned from the state.
*/
public class HeapAggregatingState<K, N, IN, ACC, OUT>
- extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
- implements InternalAggregatingState<N, IN, OUT> {
+ extends AbstractHeapMergingState<K, N, IN, ACC, OUT, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
+ implements InternalAggregatingState<K, N, IN, ACC, OUT> {
private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
@@ -64,6 +64,21 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction());
}
+ @Override
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<ACC> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
// ------------------------------------------------------------------------
// state access
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index 3a77cca..ed1d0de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -29,8 +29,7 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
- * Heap-backed partitioned {@link FoldingState} that is
- * snapshotted into files.
+ * Heap-backed partitioned {@link FoldingState} that is snapshotted into files.
*
* @param <K> The type of the key.
* @param <N> The type of the namespace.
@@ -42,7 +41,7 @@ import java.io.IOException;
@Deprecated
public class HeapFoldingState<K, N, T, ACC>
extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
- implements InternalFoldingState<N, T, ACC> {
+ implements InternalFoldingState<K, N, T, ACC> {
/** The function used to fold the state */
private final FoldTransformation<T, ACC> foldTransformation;
@@ -63,6 +62,21 @@ public class HeapFoldingState<K, N, T, ACC>
this.foldTransformation = new FoldTransformation<>(stateDesc);
}
+ @Override
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<ACC> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
// ------------------------------------------------------------------------
// state access
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 63eb33b..82f883c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -92,8 +92,7 @@ import java.util.stream.Stream;
/**
* A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to
- * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
- * checkpointing.
+ * streams provided by a {@link CheckpointStreamFactory} upon checkpointing.
*
* @param <K> The key by which state is keyed.
*/
@@ -247,7 +246,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, V> InternalValueState<N, V> createValueState(
+ public <N, V> InternalValueState<K, N, V> createValueState(
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc) throws Exception {
@@ -256,7 +255,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, T> InternalListState<N, T> createListState(
+ public <N, T> InternalListState<K, N, T> createListState(
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
@@ -265,7 +264,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, T> InternalReducingState<N, T> createReducingState(
+ public <N, T> InternalReducingState<K, N, T> createReducingState(
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {
@@ -274,7 +273,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+ public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
TypeSerializer<N> namespaceSerializer,
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
@@ -283,7 +282,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+ public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
@@ -292,7 +291,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
+ protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+ TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
StateTable<K, N, HashMap<UK, UV>> stateTable = tryRegisterStateTable(
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index f7b5cd2..bd68560 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -21,7 +21,10 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;
@@ -30,16 +33,15 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
- * into files.
+ * Heap-backed partitioned {@link ListState} that is snapshotted into files.
*
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of the value.
*/
public class HeapListState<K, N, V>
- extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
- implements InternalListState<N, V> {
+ extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>, ListState<V>, ListStateDescriptor<V>>
+ implements InternalListState<K, N, V> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
@@ -56,6 +58,21 @@ public class HeapListState<K, N, V>
super(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
+ @Override
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<List<V>> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
// ------------------------------------------------------------------------
// state access
// ------------------------------------------------------------------------
@@ -82,24 +99,34 @@ public class HeapListState<K, N, V>
}
@Override
- public byte[] getSerializedValue(K key, N namespace) throws Exception {
- Preconditions.checkState(namespace != null, "No namespace given.");
- Preconditions.checkState(key != null, "No key given.");
+ public byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<K> safeKeySerializer,
+ final TypeSerializer<N> safeNamespaceSerializer,
+ final TypeSerializer<List<V>> safeValueSerializer) throws Exception {
+
+ Preconditions.checkNotNull(serializedKeyAndNamespace);
+ Preconditions.checkNotNull(safeKeySerializer);
+ Preconditions.checkNotNull(safeNamespaceSerializer);
+ Preconditions.checkNotNull(safeValueSerializer);
+
+ Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
- List<V> result = stateTable.get(key, namespace);
+ List<V> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
if (result == null) {
return null;
}
- TypeSerializer<V> serializer = stateDesc.getElementSerializer();
+ final TypeSerializer<V> dupSerializer = ((ListSerializer<V>) safeValueSerializer).getElementSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
// write the same as RocksDB writes lists, with one ',' separator
for (int i = 0; i < result.size(); i++) {
- serializer.serialize(result.get(i), view);
+ dupSerializer.serialize(result.get(i), view);
if (i < result.size() -1) {
view.writeByte(',');
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 206f10a..7c18071 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -21,11 +21,12 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.runtime.state.HashMapSerializer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -33,14 +34,14 @@ import java.util.Map;
/**
* Heap-backed partitioned {@link MapState} that is snapshotted into files.
*
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
* @param <UK> The type of the keys in the state.
* @param <UV> The type of the values in the state.
*/
public class HeapMapState<K, N, UK, UV>
extends AbstractHeapState<K, N, HashMap<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
- implements InternalMapState<N, UK, UV> {
+ implements InternalMapState<K, N, UK, UV, HashMap<UK, UV>> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
@@ -58,6 +59,24 @@ public class HeapMapState<K, N, UK, UV>
}
@Override
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<HashMap<UK, UV>> getValueSerializer() {
+ return new HashMapSerializer<>(
+ stateDesc.getKeySerializer(),
+ stateDesc.getValueSerializer()
+ );
+ }
+
+ @Override
public UV get(UK userKey) {
HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
@@ -140,19 +159,31 @@ public class HeapMapState<K, N, UK, UV>
}
@Override
- public byte[] getSerializedValue(K key, N namespace) throws IOException {
- Preconditions.checkState(namespace != null, "No namespace given.");
- Preconditions.checkState(key != null, "No key given.");
+ public byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<K> safeKeySerializer,
+ final TypeSerializer<N> safeNamespaceSerializer,
+ final TypeSerializer<HashMap<UK, UV>> safeValueSerializer) throws Exception {
- HashMap<UK, UV> result = stateTable.get(key, namespace);
+ Preconditions.checkNotNull(serializedKeyAndNamespace);
+ Preconditions.checkNotNull(safeKeySerializer);
+ Preconditions.checkNotNull(safeNamespaceSerializer);
+ Preconditions.checkNotNull(safeValueSerializer);
- if (null == result) {
+ Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
+
+ Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
+
+ if (result == null) {
return null;
}
- TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
- TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
+ final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) safeValueSerializer;
+
+ final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
+ final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
- return KvStateSerializer.serializeMap(result.entrySet(), userKeySerializer, userValueSerializer);
+ return KvStateSerializer.serializeMap(result.entrySet(), dupUserKeySerializer, dupUserValueSerializer);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 6e11327..58b3128 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -29,8 +29,7 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is
- * snapshotted into files.
+ * Heap-backed partitioned {@link ReducingState} that is snapshotted into files.
*
* @param <K> The type of the key.
* @param <N> The type of the namespace.
@@ -38,7 +37,7 @@ import java.io.IOException;
*/
public class HeapReducingState<K, N, V>
extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
- implements InternalReducingState<N, V> {
+ implements InternalReducingState<K, N, V> {
private final ReduceTransformation<V> reduceTransformation;
@@ -59,6 +58,21 @@ public class HeapReducingState<K, N, V>
this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction());
}
+ @Override
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<V> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
// ------------------------------------------------------------------------
// state access
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index 6de62a8..bf0a3cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalValueState;
/**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted
- * into files.
+ * Heap-backed partitioned {@link ValueState} that is snapshotted into files.
*
* @param <K> The type of the key.
* @param <N> The type of the namespace.
@@ -33,7 +32,7 @@ import org.apache.flink.runtime.state.internal.InternalValueState;
*/
public class HeapValueState<K, N, V>
extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
- implements InternalValueState<N, V> {
+ implements InternalValueState<K, N, V> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
@@ -51,6 +50,21 @@ public class HeapValueState<K, N, V>
}
@Override
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ @Override
+ public TypeSerializer<V> getValueSerializer() {
+ return stateDesc.getSerializer();
+ }
+
+ @Override
public V value() {
final V result = stateTable.get(currentNamespace);
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
index 15a8e31..b66404c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
@@ -24,10 +24,11 @@ import org.apache.flink.api.common.state.AggregatingState;
* The peer to the {@link AggregatingState} in the internal state type hierarchy.
*
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- *
- * @param <N> The type of the namespace
- * @param <IN> Type of the value added to the state.
- * @param <OUT> Type of the value extracted from the state.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> Type of the value added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> Type of the value extracted from the state
*/
-public interface InternalAggregatingState<N, IN, OUT>
- extends InternalMergingState<N, IN, OUT>, AggregatingState<IN, OUT> {}
+public interface InternalAggregatingState<K, N, IN, SV, OUT> extends InternalMergingState<K, N, IN, SV, OUT>, AggregatingState<IN, OUT> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
index ae9f457..3cb84af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
@@ -24,9 +24,11 @@ import org.apache.flink.api.common.state.AppendingState;
* The peer to the {@link AppendingState} in the internal state type hierarchy.
*
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- *
- * @param <N> The type of the namespace
- * @param <IN> The type of elements added to the state
- * @param <OUT> The type of the
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> The type of elements added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> The type of the resulting element in the state
*/
-public interface InternalAppendingState<N, IN, OUT> extends InternalKvState<N>, AppendingState<IN, OUT> {}
+public interface InternalAppendingState<K, N, IN, SV, OUT> extends InternalKvState<K, N, SV>, AppendingState<IN, OUT> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
index 4ef258f..ed53d82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.state.FoldingState;
* The peer to the {@link FoldingState} in the internal state type hierarchy.
*
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- *
+ *
+ * @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <T> Type of the values folded into the state
* @param <ACC> Type of the value in the state
@@ -32,4 +33,4 @@ import org.apache.flink.api.common.state.FoldingState;
* @deprecated will be removed in a future version
*/
@Deprecated
-public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}
+public interface InternalFoldingState<K, N, T, ACC> extends InternalAppendingState<K, N, T, ACC, ACC>, FoldingState<T, ACC> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
index 06f64b6..1310dd2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.internal;
import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
/**
* The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
@@ -52,10 +53,27 @@ import org.apache.flink.api.common.state.State;
* | |
* +---------InternalReducingState
* </pre>
- *
- * @param <N> The type of the namespace.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <V> The type of values kept internally in state
*/
-public interface InternalKvState<N> extends State {
+public interface InternalKvState<K, N, V> extends State {
+
+ /**
+ * Returns the {@link TypeSerializer} for the type of key this state is associated to.
+ */
+ TypeSerializer<K> getKeySerializer();
+
+ /**
+ * Returns the {@link TypeSerializer} for the type of namespace this state is associated to.
+ */
+ TypeSerializer<N> getNamespaceSerializer();
+
+ /**
+ * Returns the {@link TypeSerializer} for the type of value this state holds.
+ */
+ TypeSerializer<V> getValueSerializer();
/**
* Sets the current namespace, which will be used when using the state access methods.
@@ -70,10 +88,21 @@ public interface InternalKvState<N> extends State {
* <p>If no value is associated with key and namespace, <code>null</code>
* is returned.
*
+ * <p><b>TO IMPLEMENTERS:</b> This method is called by multiple threads. Anything
+ * stateful (e.g. serializers) should be either duplicated or protected from undesired
+ * consequences of concurrent invocations.
+ *
* @param serializedKeyAndNamespace Serialized key and namespace
+ * @param safeKeySerializer A key serializer which is safe to be used even in multi-threaded context
+ * @param safeNamespaceSerializer A namespace serializer which is safe to be used even in multi-threaded context
+ * @param safeValueSerializer A value serializer which is safe to be used even in multi-threaded context
* @return Serialized value or <code>null</code> if no value is associated with the key and namespace.
*
* @throws Exception Exceptions during serialization are forwarded
*/
- byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
+ byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<K> safeKeySerializer,
+ final TypeSerializer<N> safeNamespaceSerializer,
+ final TypeSerializer<V> safeValueSerializer) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index 1e22dc6..1d6653b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -26,11 +26,13 @@ import java.util.List;
* The peer to the {@link ListState} in the internal state type hierarchy.
*
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- *
+ *
+ * @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <T> The type of elements in the list
*/
-public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {
+public interface InternalListState<K, N, T> extends InternalMergingState<K, N, T, List<T>, Iterable<T>>, ListState<T> {
+
/**
* Updates the operator state accessible by {@link #get()} by updating existing values to
* to the given list of values. The next time {@link #get()} is called (for the same state
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
index f2a7b41..91f698c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.state.internal;
import org.apache.flink.api.common.state.MapState;
+import java.util.Map;
+
/**
* The peer to the {@link MapState} in the internal state type hierarchy.
*
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
*
+ * @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <UK> Type of the values folded into the state
* @param <UV> Type of the value in the state
*/
-public interface InternalMapState<N, UK, UV> extends InternalKvState<N>, MapState<UK, UV> {}
+public interface InternalMapState<K, N, UK, UV, ST extends Map<UK, UV>> extends InternalKvState<K, N, ST>, MapState<UK, UV> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
index abc7d7c..2c72697 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
@@ -26,12 +26,14 @@ import java.util.Collection;
* The peer to the {@link MergingState} in the internal state type hierarchy.
*
* See {@link InternalKvState} for a description of the internal state hierarchy.
- *
- * @param <N> The type of the namespace
- * @param <IN> The type of elements added to the state
- * @param <OUT> The type of elements
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> The type of elements added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> The type of elements
*/
-public interface InternalMergingState<N, IN, OUT> extends InternalAppendingState<N, IN, OUT>, MergingState<IN, OUT> {
+public interface InternalMergingState<K, N, IN, SV, OUT> extends InternalAppendingState<K, N, IN, SV, OUT>, MergingState<IN, OUT> {
/**
* Merges the state of the current key for the given source namespaces into the state of
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
index 76fa58f..f7bff2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.state.ReducingState;
* The peer to the {@link ReducingState} in the internal state type hierarchy.
*
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- *
+ *
+ * @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <T> The type of elements in the aggregated by the ReduceFunction
*/
-public interface InternalReducingState<N, T> extends InternalMergingState<N, T, T>, ReducingState<T> {}
+public interface InternalReducingState<K, N, T> extends InternalMergingState<K, N, T, T, T>, ReducingState<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
index 7177b8a..169cdba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.state.ValueState;
* The peer to the {@link ValueState} in the internal state type hierarchy.
*
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- *
+ *
+ * @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <T> The type of elements in the list
*/
-public interface InternalValueState<N, T> extends InternalKvState<N>, ValueState<T> {}
+public interface InternalValueState<K, N, T> extends InternalKvState<K, N, T>, ValueState<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index 43aa1d1..36a85d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -19,27 +19,126 @@
package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/**
* Tests for the {@link KvStateRegistry}.
*/
public class KvStateRegistryTest extends TestLogger {
+ @Test
+ public void testKvStateEntry() throws InterruptedException {
+ final int threads = 10;
+
+ final CountDownLatch latch1 = new CountDownLatch(threads);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
+
+ final JobID jobID = new JobID();
+
+ final JobVertexID jobVertexId = new JobVertexID();
+ final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+ final String registrationName = "foobar";
+
+ final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+ final KvStateID stateID = kvStateRegistry.registerKvState(
+ jobID,
+ jobVertexId,
+ keyGroupRange,
+ registrationName,
+ new DummyKvState()
+ );
+
+ final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
+
+ for (int i = 0; i < threads; i++) {
+ new Thread(() -> {
+ final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
+ final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
+ infos.add(stateInfo);
+
+ latch1.countDown();
+ try {
+ latch2.await();
+ } catch (InterruptedException e) {
+ // compare and set, so that we do not overwrite an exception
+ // that was (potentially) already encountered.
+ exceptionHolder.compareAndSet(null, e);
+ }
+
+ }).start();
+ }
+
+ latch1.await();
+
+ final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
+
+ // verify that all the threads are done correctly.
+ Assert.assertEquals(threads, infos.size());
+ Assert.assertEquals(threads, kvState.getCacheSize());
+
+ latch2.countDown();
+
+ for (KvStateInfo<?, ?, ?> infoA: infos) {
+ boolean instanceAlreadyFound = false;
+ for (KvStateInfo<?, ?, ?> infoB: infos) {
+ if (infoA == infoB) {
+ if (instanceAlreadyFound) {
+ Assert.fail("More than one thread sharing the same serializer instance.");
+ }
+ instanceAlreadyFound = true;
+ } else {
+ Assert.assertEquals(infoA, infoB);
+ }
+ }
+ }
+
+ kvStateRegistry.unregisterKvState(
+ jobID,
+ jobVertexId,
+ keyGroupRange,
+ registrationName,
+ stateID);
+
+ Assert.assertEquals(0L, kvState.getCacheSize());
+
+ Throwable t = exceptionHolder.get();
+ if (t != null) {
+ fail(t.getMessage());
+ }
+ }
+
/**
* Tests that {@link KvStateRegistryListener} only receive the notifications which
* are destined for them.
@@ -74,7 +173,7 @@ public class KvStateRegistryTest extends TestLogger {
jobVertexId,
keyGroupRange,
registrationName,
- new DummyKvState<>());
+ new DummyKvState());
assertThat(registeredNotifications1.poll(), equalTo(jobId1));
assertThat(registeredNotifications2.isEmpty(), is(true));
@@ -87,7 +186,7 @@ public class KvStateRegistryTest extends TestLogger {
jobVertexId2,
keyGroupRange2,
registrationName2,
- new DummyKvState<>());
+ new DummyKvState());
assertThat(registeredNotifications2.poll(), equalTo(jobId2));
assertThat(registeredNotifications1.isEmpty(), is(true));
@@ -191,18 +290,35 @@ public class KvStateRegistryTest extends TestLogger {
/**
* Testing implementation of {@link InternalKvState}.
- *
- * @param <T> type of the state
*/
- private static final class DummyKvState<T> implements InternalKvState<T> {
+ private static class DummyKvState implements InternalKvState<Integer, VoidNamespace, String> {
@Override
- public void setCurrentNamespace(Object namespace) {
+ public TypeSerializer<Integer> getKeySerializer() {
+ return IntSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
+ return VoidNamespaceSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeSerializer<String> getValueSerializer() {
+ return new DeepCopyingStringSerializer();
+ }
+
+ @Override
+ public void setCurrentNamespace(VoidNamespace namespace) {
// noop
}
@Override
- public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
+ public byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<Integer> safeKeySerializer,
+ final TypeSerializer<VoidNamespace> safeNamespaceSerializer,
+ final TypeSerializer<String> safeValueSerializer) throws Exception {
return serializedKeyAndNamespace;
}
@@ -212,4 +328,86 @@ public class KvStateRegistryTest extends TestLogger {
}
}
+ /**
+ * A dummy serializer that just returns another instance when .duplicate().
+ */
+ private static class DeepCopyingStringSerializer extends TypeSerializer<String> {
+
+ private static final long serialVersionUID = -3744051158625555607L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<String> duplicate() {
+ return new DeepCopyingStringSerializer();
+ }
+
+ @Override
+ public String createInstance() {
+ return null;
+ }
+
+ @Override
+ public String copy(String from) {
+ return null;
+ }
+
+ @Override
+ public String copy(String from, String reuse) {
+ return null;
+ }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(String record, DataOutputView target) throws IOException {
+
+ }
+
+ @Override
+ public String deserialize(DataInputView source) throws IOException {
+ return null;
+ }
+
+ @Override
+ public String deserialize(String reuse, DataInputView source) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof DeepCopyingStringSerializer;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return null;
+ }
+
+ @Override
+ public CompatibilityResult<String> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ return null;
+ }
+ }
}
[2/5] flink git commit: [FLINK-8928] [QS] Improve server binding
error message.
Posted by kk...@apache.org.
[FLINK-8928] [QS] Improve server binding error message.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a8172a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a8172a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a8172a9
Branch: refs/heads/master
Commit: 6a8172a9c654589faa01f1fccb2dec5e008fe532
Parents: eea887b
Author: kkloudas <kk...@gmail.com>
Authored: Mon Mar 12 12:12:06 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Mar 29 17:33:07 2018 +0200
----------------------------------------------------------------------
.../apache/flink/queryablestate/network/AbstractServerBase.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6a8172a9/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index d5afeb3..2e67b98 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -194,7 +194,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
if (serverAddress != null) {
log.info("Started {} @ {}.", serverName, serverAddress);
} else {
- log.info("Unable to start {}. All ports in provided range are occupied.", serverName);
+ log.info("Unable to start {}. All ports in provided range ({}) are occupied.", serverName, bindPortRange);
throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied.");
}
}
[3/5] flink git commit: [FLINK-8926] [QS] Shutdown client proxy after
test ends.
Posted by kk...@apache.org.
[FLINK-8926] [QS] Shutdown client proxy after test ends.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c16e2c9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c16e2c9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c16e2c9b
Branch: refs/heads/master
Commit: c16e2c9b23fe0f287041a3eff51c7ee221da0dc5
Parents: 6a8172a
Author: kkloudas <kk...@gmail.com>
Authored: Mon Mar 12 13:19:23 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Mar 29 17:33:20 2018 +0200
----------------------------------------------------------------------
.../client/proxy/KvStateClientProxyImplTest.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c16e2c9b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
index fc8b8da..cd33255 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -53,6 +54,11 @@ public class KvStateClientProxyImplTest extends TestLogger {
new DisabledKvStateRequestStats());
}
+ @After
+ public void shutdown() {
+ kvStateClientProxy.shutdown();
+ }
+
/**
* Tests that we can set and retrieve the {@link KvStateLocationOracle}.
*/