You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/09 14:47:37 UTC
[03/10] flink git commit: [FLINK-3779] [runtime] Add
getSerializedValue(byte[]) to KvState
http://git-wip-us.apache.org/repos/asf/flink/blob/a909adbf/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 80f1de3..d59e17b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state;
import com.google.common.base.Joiner;
-
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
@@ -37,18 +36,34 @@ import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.types.IntValue;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
-
-import static org.junit.Assert.*;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
/**
* Generic tests for the partitioned state part of {@link AbstractStateBackend}.
@@ -74,24 +89,33 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
@Test
+ @SuppressWarnings("unchecked")
public void testValueState() throws Exception {
-
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
- ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+ TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+ ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> kvState = (KvState<Integer, VoidNamespace, ?, ?, B>) state;
// some modifications to the state
backend.setCurrentKey(1);
assertNull(state.value());
+ assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.update("1");
backend.setCurrentKey(2);
assertNull(state.value());
+ assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.update("2");
backend.setCurrentKey(1);
assertEquals("1", state.value());
+ assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -122,10 +146,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
// validate the original state
backend.setCurrentKey(1);
assertEquals("u1", state.value());
+ assertEquals("u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("u2", state.value());
+ assertEquals("u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(3);
assertEquals("u3", state.value());
+ assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.dispose();
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
@@ -136,12 +163,16 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
snapshot1.get(key).discardState();
}
- ValueState<String> restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState1 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored1;
backend.setCurrentKey(1);
assertEquals("1", restored1.value());
+ assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("2", restored1.value());
+ assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.dispose();
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
@@ -152,14 +183,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
snapshot2.get(key).discardState();
}
- ValueState<String> restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ValueState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState2 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored2;
backend.setCurrentKey(1);
assertEquals("u1", restored2.value());
+ assertEquals("u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("u2", restored2.value());
+ assertEquals("u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(3);
assertEquals("u3", restored2.value());
+ assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
}
/**
@@ -169,14 +205,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
* @throws Exception
*/
@Test
+ @SuppressWarnings("unchecked")
public void testValueStateNullUpdate() throws Exception {
-
// precondition: LongSerializer must fail on null value. this way the test would fail
// later if null values where actually stored in the state instead of acting as clear()
try {
LongSerializer.INSTANCE.serialize(null,
new DataOutputViewStreamWrapper(new ByteArrayOutputStream()));
- fail("Should faill with NullPointerException");
+ fail("Should fail with NullPointerException");
} catch (NullPointerException e) {
// alrighty
}
@@ -186,7 +222,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
ValueStateDescriptor<Long> kvId = new ValueStateDescriptor<>("id", LongSerializer.INSTANCE, 42L);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
- ValueState<Long> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ValueState<Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
// some modifications to the state
backend.setCurrentKey(1);
@@ -218,7 +254,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
}
-
backend.dispose();
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
@@ -228,7 +263,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
snapshot1.get(key).discardState();
}
- backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
}
@Test
@@ -238,18 +273,29 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
- ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+ TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+ ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> kvState = (KvState<Integer, VoidNamespace, ?, ?, B>) state;
Joiner joiner = Joiner.on(",");
// some modifications to the state
backend.setCurrentKey(1);
assertEquals(null, state.get());
+ assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add("1");
backend.setCurrentKey(2);
assertEquals(null, state.get());
+ assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add("2");
backend.setCurrentKey(1);
assertEquals("1", joiner.join(state.get()));
+ assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
// draw a snapshot
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -280,10 +326,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
// validate the original state
backend.setCurrentKey(1);
assertEquals("1,u1", joiner.join(state.get()));
+ assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
backend.setCurrentKey(2);
assertEquals("2,u2", joiner.join(state.get()));
+ assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
backend.setCurrentKey(3);
assertEquals("u3", joiner.join(state.get()));
+ assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
backend.dispose();
@@ -295,12 +344,16 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
snapshot1.get(key).discardState();
}
- ListState<String> restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState1 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored1;
backend.setCurrentKey(1);
assertEquals("1", joiner.join(restored1.get()));
+ assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
backend.setCurrentKey(2);
assertEquals("2", joiner.join(restored1.get()));
+ assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
backend.dispose();
@@ -312,14 +365,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
snapshot2.get(key).discardState();
}
- ListState<String> restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState2 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored2;
backend.setCurrentKey(1);
assertEquals("1,u1", joiner.join(restored2.get()));
+ assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
backend.setCurrentKey(2);
assertEquals("2,u2", joiner.join(restored2.get()));
+ assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
backend.setCurrentKey(3);
assertEquals("u3", joiner.join(restored2.get()));
+ assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
}
catch (Exception e) {
e.printStackTrace();
@@ -328,23 +386,34 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
@Test
+ @SuppressWarnings("unchecked")
public void testReducingState() {
try {
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
- ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+ TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+ ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> kvState = (KvState<Integer, VoidNamespace, ?, ?, B>) state;
// some modifications to the state
backend.setCurrentKey(1);
assertEquals(null, state.get());
+ assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add("1");
backend.setCurrentKey(2);
assertEquals(null, state.get());
+ assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add("2");
backend.setCurrentKey(1);
assertEquals("1", state.get());
+ assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -375,10 +444,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
// validate the original state
backend.setCurrentKey(1);
assertEquals("1,u1", state.get());
+ assertEquals("1,u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("2,u2", state.get());
+ assertEquals("2,u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(3);
assertEquals("u3", state.get());
+ assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.dispose();
@@ -390,12 +462,16 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
snapshot1.get(key).discardState();
}
- ReducingState<String> restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState1 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored1;
backend.setCurrentKey(1);
assertEquals("1", restored1.get());
+ assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("2", restored1.get());
+ assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.dispose();
@@ -407,15 +483,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
snapshot2.get(key).discardState();
}
- ReducingState<String> restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
-
+ ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState2 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored2;
backend.setCurrentKey(1);
assertEquals("1,u1", restored2.get());
+ assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("2,u2", restored2.get());
+ assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(3);
assertEquals("u3", restored2.get());
+ assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
}
catch (Exception e) {
e.printStackTrace();
@@ -432,19 +512,29 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
"Fold-Initial:",
new AppendingFold(),
- String.class);
+ String.class);
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
- FoldingState<Integer, String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+ TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+ FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> kvState = (KvState<Integer, VoidNamespace, ?, ?, B>) state;
// some modifications to the state
backend.setCurrentKey(1);
assertEquals(null, state.get());
+ assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add(1);
backend.setCurrentKey(2);
assertEquals(null, state.get());
+ assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
state.add(2);
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,1", state.get());
+ assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -476,10 +566,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
// validate the original state
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,101", state.get());
+ assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("Fold-Initial:,2,102", state.get());
+ assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(3);
assertEquals("Fold-Initial:,103", state.get());
+ assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.dispose();
@@ -491,12 +584,16 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
snapshot1.get(key).discardState();
}
- FoldingState<Integer, String> restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState1 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored1;
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,1", restored1.get());
+ assertEquals("Fold-Initial:,1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("Fold-Initial:,2", restored1.get());
+ assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.dispose();
@@ -509,14 +606,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
@SuppressWarnings("unchecked")
- FoldingState<Integer, String> restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState2 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored2;
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,101", restored2.get());
+ assertEquals("Fold-Initial:,101", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(2);
assertEquals("Fold-Initial:,2,102", restored2.get());
+ assertEquals("Fold-Initial:,2,102", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
backend.setCurrentKey(3);
assertEquals("Fold-Initial:,103", restored2.get());
+ assertEquals("Fold-Initial:,103", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
}
catch (Exception e) {
e.printStackTrace();
@@ -525,6 +627,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
@Test
+ @SuppressWarnings("unchecked")
public void testValueStateRestoreWithWrongSerializers() {
try {
backend.initializeForJob(new DummyEnvironment("test", 1, 0),
@@ -534,7 +637,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
- ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
backend.setCurrentKey(1);
state.update("1");
@@ -567,7 +670,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
try {
kvId = new ValueStateDescriptor<>("id", fakeStringSerializer, null);
- state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
state.value();
@@ -588,12 +691,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
@Test
+ @SuppressWarnings("unchecked")
public void testListStateRestoreWithWrongSerializers() {
try {
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
- ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
backend.setCurrentKey(1);
state.add("1");
@@ -626,7 +730,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
try {
kvId = new ListStateDescriptor<>("id", fakeStringSerializer);
- state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
state.get();
@@ -647,6 +751,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
@Test
+ @SuppressWarnings("unchecked")
public void testReducingStateRestoreWithWrongSerializers() {
try {
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
@@ -654,14 +759,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id",
new AppendingReduce(),
StringSerializer.INSTANCE);
- ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
backend.setCurrentKey(1);
state.add("1");
backend.setCurrentKey(2);
state.add("2");
-
// draw a snapshot
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -688,7 +792,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
try {
kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), fakeStringSerializer);
- state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
state.get();
@@ -715,7 +819,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
- ValueState<IntValue> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+ ValueState<IntValue> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
backend.setCurrentKey(1);
IntValue default1 = state.value();
@@ -729,6 +833,222 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
assertFalse(default1 == default2);
}
+ /**
+ * Previously, it was possible to create partitioned state with
+ * <code>null</code> namespace. This test makes sure that this is
+ * prohibited now.
+ */
+ @Test
+ public void testRequireNonNullNamespace() throws Exception {
+ backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
+
+ ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ try {
+ backend.getPartitionedState(null, VoidNamespaceSerializer.INSTANCE, kvId);
+ fail("Did not throw expected NullPointerException");
+ } catch (NullPointerException ignored) {
+ }
+
+ try {
+ backend.getPartitionedState(VoidNamespace.INSTANCE, null, kvId);
+ fail("Did not throw expected NullPointerException");
+ } catch (NullPointerException ignored) {
+ }
+
+ try {
+ backend.getPartitionedState(null, null, kvId);
+ fail("Did not throw expected NullPointerException");
+ } catch (NullPointerException ignored) {
+ }
+ }
+
+ /**
+ * Tests that {@link AbstractHeapState} instances respect the queryable
+ * flag and create concurrent variants for internal state structures.
+ */
+ @SuppressWarnings("unchecked")
+ protected static <B extends AbstractStateBackend> void testConcurrentMapIfQueryable(B backend) throws Exception {
+ {
+ // ValueState
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>(
+ "value-state",
+ Integer.class,
+ -1);
+ desc.setQueryable("my-query");
+ desc.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ ValueState<Integer> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ KvState<Integer, VoidNamespace, ?, ?, ?> kvState = (KvState<Integer, VoidNamespace, ?, ?, ?>) state;
+ assertTrue(kvState instanceof AbstractHeapState);
+
+ Map<VoidNamespace, Map<Integer, ?>> stateMap = ((AbstractHeapState) kvState).getStateMap();
+ assertTrue(stateMap instanceof ConcurrentHashMap);
+
+ kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ kvState.setCurrentKey(1);
+ state.update(121818273);
+
+ Map<Integer, ?> namespaceMap = stateMap.get(VoidNamespace.INSTANCE);
+
+ assertNotNull("Value not set", namespaceMap);
+ assertTrue(namespaceMap instanceof ConcurrentHashMap);
+ }
+
+ {
+ // ListState
+ ListStateDescriptor<Integer> desc = new ListStateDescriptor<>("list-state", Integer.class);
+ desc.setQueryable("my-query");
+ desc.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ ListState<Integer> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ KvState<Integer, VoidNamespace, ?, ?, ?> kvState = (KvState<Integer, VoidNamespace, ?, ?, ?>) state;
+ assertTrue(kvState instanceof AbstractHeapState);
+
+ Map<VoidNamespace, Map<Integer, ?>> stateMap = ((AbstractHeapState) kvState).getStateMap();
+ assertTrue(stateMap instanceof ConcurrentHashMap);
+
+ kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ kvState.setCurrentKey(1);
+ state.add(121818273);
+
+ Map<Integer, ?> namespaceMap = stateMap.get(VoidNamespace.INSTANCE);
+
+ assertNotNull("List not set", namespaceMap);
+ assertTrue(namespaceMap instanceof ConcurrentHashMap);
+ }
+
+ {
+ // ReducingState
+ ReducingStateDescriptor<Integer> desc = new ReducingStateDescriptor<>(
+ "reducing-state", new ReduceFunction<Integer>() {
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+ }, Integer.class);
+ desc.setQueryable("my-query");
+ desc.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ ReducingState<Integer> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ KvState<Integer, VoidNamespace, ?, ?, ?> kvState = (KvState<Integer, VoidNamespace, ?, ?, ?>) state;
+ assertTrue(kvState instanceof AbstractHeapState);
+
+ Map<VoidNamespace, Map<Integer, ?>> stateMap = ((AbstractHeapState) kvState).getStateMap();
+ assertTrue(stateMap instanceof ConcurrentHashMap);
+
+ kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ kvState.setCurrentKey(1);
+ state.add(121818273);
+
+ Map<Integer, ?> namespaceMap = stateMap.get(VoidNamespace.INSTANCE);
+
+ assertNotNull("List not set", namespaceMap);
+ assertTrue(namespaceMap instanceof ConcurrentHashMap);
+ }
+
+ {
+ // FoldingState
+ FoldingStateDescriptor<Integer, Integer> desc = new FoldingStateDescriptor<>(
+ "folding-state", 0, new FoldFunction<Integer, Integer>() {
+ @Override
+ public Integer fold(Integer accumulator, Integer value) throws Exception {
+ return accumulator + value;
+ }
+ }, Integer.class);
+ desc.setQueryable("my-query");
+ desc.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ FoldingState<Integer, Integer> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ KvState<Integer, VoidNamespace, ?, ?, ?> kvState = (KvState<Integer, VoidNamespace, ?, ?, ?>) state;
+ assertTrue(kvState instanceof AbstractHeapState);
+
+ Map<VoidNamespace, Map<Integer, ?>> stateMap = ((AbstractHeapState) kvState).getStateMap();
+ assertTrue(stateMap instanceof ConcurrentHashMap);
+
+ kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ kvState.setCurrentKey(1);
+ state.add(121818273);
+
+ Map<Integer, ?> namespaceMap = stateMap.get(VoidNamespace.INSTANCE);
+
+ assertNotNull("List not set", namespaceMap);
+ assertTrue(namespaceMap instanceof ConcurrentHashMap);
+ }
+ }
+
+ /**
+ * Tests registration with the KvStateRegistry.
+ */
+ @Test
+ public void testQueryableStateRegistration() throws Exception {
+ DummyEnvironment env = new DummyEnvironment("test", 1, 0);
+ KvStateRegistry registry = env.getKvStateRegistry();
+
+ KvStateRegistryListener listener = mock(KvStateRegistryListener.class);
+ registry.registerListener(listener);
+
+ backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>(
+ "test",
+ IntSerializer.INSTANCE,
+ null);
+ desc.setQueryable("banana");
+
+ backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
+
+ // Verify registered
+ verify(listener, times(1)).notifyKvStateRegistered(
+ eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+
+
+ HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot = backend
+ .snapshotPartitionedState(682375462379L, 4);
+
+ for (String key: snapshot.keySet()) {
+ if (snapshot.get(key) instanceof AsynchronousKvStateSnapshot) {
+ snapshot.put(key, ((AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) snapshot.get(key)).materialize());
+ }
+ }
+
+ // Verify unregistered
+ backend.dispose();
+
+ verify(listener, times(1)).notifyKvStateUnregistered(
+ eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"));
+
+ // Initialize again
+ backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+
+ backend.injectKeyValueStateSnapshots((HashMap) snapshot);
+
+ backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
+
+ // Verify registered again
+ verify(listener, times(2)).notifyKvStateRegistered(
+ eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+
+
+ }
+
private static class AppendingReduce implements ReduceFunction<String> {
@Override
public String reduce(String value1, String value2) throws Exception {
@@ -744,4 +1064,52 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
return acc + "," + value;
}
}
+
+ /**
+ * Returns the value by getting the serialized value and deserializing it
+ * if it is not null.
+ */
+ private static <V, K, N> V getSerializedValue(
+ KvState<K, N, ?, ?, ?> kvState,
+ K key,
+ TypeSerializer<K> keySerializer,
+ N namespace,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<V> valueSerializer) throws Exception {
+
+ byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
+ key, keySerializer, namespace, namespaceSerializer);
+
+ byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+
+ if (serializedValue == null) {
+ return null;
+ } else {
+ return KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
+ }
+ }
+
+ /**
+ * Returns the value by getting the serialized value and deserializing it
+ * if it is not null.
+ */
+ private static <V, K, N> List<V> getSerializedList(
+ KvState<K, N, ?, ?, ?> kvState,
+ K key,
+ TypeSerializer<K> keySerializer,
+ N namespace,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<V> valueSerializer) throws Exception {
+
+ byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
+ key, keySerializer, namespace, namespaceSerializer);
+
+ byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+
+ if (serializedValue == null) {
+ return null;
+ } else {
+ return KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a909adbf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 0269a34..15bb384 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -23,19 +23,19 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,7 +189,6 @@ public abstract class AbstractStreamOperator<OUT>
}
}
-
return state;
}
@@ -271,7 +270,7 @@ public abstract class AbstractStreamOperator<OUT>
* @throws Exception Thrown, if the state backend cannot create the key/value state.
*/
protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
- return getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, stateDescriptor);
+ return getStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a909adbf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 2434843..98bb303 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,6 +44,8 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -555,7 +556,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
- ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+ ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
mergeState.clear();
@@ -863,7 +864,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
setKeyContext(key.getKey());
- ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+ ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
mergeState.clear();
key.getValue().persist(mergeState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a909adbf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 30ebb20..b2f6dbd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -29,14 +29,13 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
-
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemListState;
import org.junit.Test;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -44,8 +43,12 @@ import java.util.Collections;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class StreamingRuntimeContextTest {
@@ -179,8 +182,10 @@ public class StreamingRuntimeContextTest {
public ListState<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
ListStateDescriptor<String> descr =
(ListStateDescriptor<String>) invocationOnMock.getArguments()[0];
- return new MemListState<String, Void, String>(
- StringSerializer.INSTANCE, VoidSerializer.INSTANCE, descr);
+ MemListState<String, VoidNamespace, String> listState = new MemListState<>(
+ StringSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE, descr);
+ listState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ return listState;
}
});