You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/09 14:47:37 UTC

[03/10] flink git commit: [FLINK-3779] [runtime] Add getSerializedValue(byte[]) to KvState

http://git-wip-us.apache.org/repos/asf/flink/blob/a909adbf/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 80f1de3..d59e17b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state;
 
 import com.google.common.base.Joiner;
-
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -37,18 +36,34 @@ import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.types.IntValue;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
-
-import static org.junit.Assert.*;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Generic tests for the partitioned state part of {@link AbstractStateBackend}.
@@ -74,24 +89,33 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testValueState() throws Exception {
-
 		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
 		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
-		ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+		TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		KvState<Integer, VoidNamespace, ?, ?, B> kvState = (KvState<Integer, VoidNamespace, ?, ?, B>) state;
 
 		// some modifications to the state
 		backend.setCurrentKey(1);
 		assertNull(state.value());
+		assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		state.update("1");
 		backend.setCurrentKey(2);
 		assertNull(state.value());
+		assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		state.update("2");
 		backend.setCurrentKey(1);
 		assertEquals("1", state.value());
+		assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 		// draw a snapshot
 		HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -122,10 +146,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		// validate the original state
 		backend.setCurrentKey(1);
 		assertEquals("u1", state.value());
+		assertEquals("u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		backend.setCurrentKey(2);
 		assertEquals("u2", state.value());
+		assertEquals("u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		backend.setCurrentKey(3);
 		assertEquals("u3", state.value());
+		assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 		backend.dispose();
 		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
@@ -136,12 +163,16 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			snapshot1.get(key).discardState();
 		}
 
-		ValueState<String> restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+		ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState1 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored1;
 
 		backend.setCurrentKey(1);
 		assertEquals("1", restored1.value());
+		assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		backend.setCurrentKey(2);
 		assertEquals("2", restored1.value());
+		assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 		backend.dispose();
 		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
@@ -152,14 +183,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			snapshot2.get(key).discardState();
 		}
 
-		ValueState<String> restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+		ValueState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState2 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored2;
 
 		backend.setCurrentKey(1);
 		assertEquals("u1", restored2.value());
+		assertEquals("u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		backend.setCurrentKey(2);
 		assertEquals("u2", restored2.value());
+		assertEquals("u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		backend.setCurrentKey(3);
 		assertEquals("u3", restored2.value());
+		assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 	}
 
 	/**
@@ -169,14 +205,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	 * @throws Exception
 	 */
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testValueStateNullUpdate() throws Exception {
-
 		// precondition: LongSerializer must fail on null value. this way the test would fail
 		// later if null values where actually stored in the state instead of acting as clear()
 		try {
 			LongSerializer.INSTANCE.serialize(null,
 				new DataOutputViewStreamWrapper(new ByteArrayOutputStream()));
-			fail("Should faill with NullPointerException");
+			fail("Should fail with NullPointerException");
 		} catch (NullPointerException e) {
 			// alrighty
 		}
@@ -186,7 +222,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		ValueStateDescriptor<Long> kvId = new ValueStateDescriptor<>("id", LongSerializer.INSTANCE, 42L);
 		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
-		ValueState<Long> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+		ValueState<Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 		// some modifications to the state
 		backend.setCurrentKey(1);
@@ -218,7 +254,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			}
 		}
 
-
 		backend.dispose();
 		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
@@ -228,7 +263,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			snapshot1.get(key).discardState();
 		}
 
-		backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+		backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 	}
 
 	@Test
@@ -238,18 +273,29 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
 			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
-			ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+			TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+			ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> kvState = (KvState<Integer, VoidNamespace, ?, ?, B>) state;
 
 			Joiner joiner = Joiner.on(",");
 			// some modifications to the state
 			backend.setCurrentKey(1);
 			assertEquals(null, state.get());
+			assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			state.add("1");
 			backend.setCurrentKey(2);
 			assertEquals(null, state.get());
+			assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			state.add("2");
 			backend.setCurrentKey(1);
 			assertEquals("1", joiner.join(state.get()));
+			assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
 			// draw a snapshot
 			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -280,10 +326,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			// validate the original state
 			backend.setCurrentKey(1);
 			assertEquals("1,u1", joiner.join(state.get()));
+			assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 			backend.setCurrentKey(2);
 			assertEquals("2,u2", joiner.join(state.get()));
+			assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 			backend.setCurrentKey(3);
 			assertEquals("u3", joiner.join(state.get()));
+			assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
 			backend.dispose();
 
@@ -295,12 +344,16 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 				snapshot1.get(key).discardState();
 			}
 
-			ListState<String> restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState1 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored1;
 
 			backend.setCurrentKey(1);
 			assertEquals("1", joiner.join(restored1.get()));
+			assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 			backend.setCurrentKey(2);
 			assertEquals("2", joiner.join(restored1.get()));
+			assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
 			backend.dispose();
 
@@ -312,14 +365,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 				snapshot2.get(key).discardState();
 			}
 
-			ListState<String> restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState2 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored2;
 
 			backend.setCurrentKey(1);
 			assertEquals("1,u1", joiner.join(restored2.get()));
+			assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 			backend.setCurrentKey(2);
 			assertEquals("2,u2", joiner.join(restored2.get()));
+			assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 			backend.setCurrentKey(3);
 			assertEquals("u3", joiner.join(restored2.get()));
+			assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -328,23 +386,34 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testReducingState() {
 		try {
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
 			ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
-			ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+			TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+			ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> kvState = (KvState<Integer, VoidNamespace, ?, ?, B>) state;
 
 			// some modifications to the state
 			backend.setCurrentKey(1);
 			assertEquals(null, state.get());
+			assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			state.add("1");
 			backend.setCurrentKey(2);
 			assertEquals(null, state.get());
+			assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			state.add("2");
 			backend.setCurrentKey(1);
 			assertEquals("1", state.get());
+			assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			// draw a snapshot
 			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -375,10 +444,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			// validate the original state
 			backend.setCurrentKey(1);
 			assertEquals("1,u1", state.get());
+			assertEquals("1,u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("2,u2", state.get());
+			assertEquals("2,u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(3);
 			assertEquals("u3", state.get());
+			assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			backend.dispose();
 
@@ -390,12 +462,16 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 				snapshot1.get(key).discardState();
 			}
 
-			ReducingState<String> restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState1 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored1;
 
 			backend.setCurrentKey(1);
 			assertEquals("1", restored1.get());
+			assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("2", restored1.get());
+			assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			backend.dispose();
 
@@ -407,15 +483,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 				snapshot2.get(key).discardState();
 			}
 
-			ReducingState<String> restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
-
+			ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState2 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored2;
 
 			backend.setCurrentKey(1);
 			assertEquals("1,u1", restored2.get());
+			assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("2,u2", restored2.get());
+			assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(3);
 			assertEquals("u3", restored2.get());
+			assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -432,19 +512,29 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
 					"Fold-Initial:",
 					new AppendingFold(),
-				String.class);
+					String.class);
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
-			FoldingState<Integer, String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+			TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+			FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> kvState = (KvState<Integer, VoidNamespace, ?, ?, B>) state;
 
 			// some modifications to the state
 			backend.setCurrentKey(1);
 			assertEquals(null, state.get());
+			assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			state.add(1);
 			backend.setCurrentKey(2);
 			assertEquals(null, state.get());
+			assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			state.add(2);
 			backend.setCurrentKey(1);
 			assertEquals("Fold-Initial:,1", state.get());
+			assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			// draw a snapshot
 			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
@@ -476,10 +566,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			// validate the original state
 			backend.setCurrentKey(1);
 			assertEquals("Fold-Initial:,101", state.get());
+			assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("Fold-Initial:,2,102", state.get());
+			assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(3);
 			assertEquals("Fold-Initial:,103", state.get());
+			assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			backend.dispose();
 
@@ -491,12 +584,16 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 				snapshot1.get(key).discardState();
 			}
 
-			FoldingState<Integer, String> restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState1 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored1;
 
 			backend.setCurrentKey(1);
 			assertEquals("Fold-Initial:,1", restored1.get());
+			assertEquals("Fold-Initial:,1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("Fold-Initial:,2", restored1.get());
+			assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			backend.dispose();
 
@@ -509,14 +606,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			}
 
 			@SuppressWarnings("unchecked")
-			FoldingState<Integer, String> restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			@SuppressWarnings("unchecked")
+			KvState<Integer, VoidNamespace, ?, ?, B> restoredKvState2 = (KvState<Integer, VoidNamespace, ?, ?, B>) restored2;
 
 			backend.setCurrentKey(1);
 			assertEquals("Fold-Initial:,101", restored2.get());
+			assertEquals("Fold-Initial:,101", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(2);
 			assertEquals("Fold-Initial:,2,102", restored2.get());
+			assertEquals("Fold-Initial:,2,102", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 			backend.setCurrentKey(3);
 			assertEquals("Fold-Initial:,103", restored2.get());
+			assertEquals("Fold-Initial:,103", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -525,6 +627,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testValueStateRestoreWithWrongSerializers() {
 		try {
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0),
@@ -534,7 +637,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
 			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 			
-			ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 			backend.setCurrentKey(1);
 			state.update("1");
@@ -567,7 +670,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			try {
 				kvId = new ValueStateDescriptor<>("id", fakeStringSerializer, null);
 
-				state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+				state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 				state.value();
 
@@ -588,12 +691,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testListStateRestoreWithWrongSerializers() {
 		try {
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
 			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
-			ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 			backend.setCurrentKey(1);
 			state.add("1");
@@ -626,7 +730,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			try {
 				kvId = new ListStateDescriptor<>("id", fakeStringSerializer);
 
-				state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+				state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 				state.get();
 
@@ -647,6 +751,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testReducingStateRestoreWithWrongSerializers() {
 		try {
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
@@ -654,14 +759,13 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id",
 					new AppendingReduce(),
 					StringSerializer.INSTANCE);
-			ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+			ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 			backend.setCurrentKey(1);
 			state.add("1");
 			backend.setCurrentKey(2);
 			state.add("2");
 
-
 			// draw a snapshot
 			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2);
 
@@ -688,7 +792,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			try {
 				kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), fakeStringSerializer);
 
-				state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+				state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 				state.get();
 
@@ -715,7 +819,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
 		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
-		ValueState<IntValue> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+		ValueState<IntValue> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
 		backend.setCurrentKey(1);
 		IntValue default1 = state.value();
@@ -729,6 +833,222 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		assertFalse(default1 == default2);
 	}
 
+	/**
+	 * Previously, it was possible to create partitioned state with
+	 * <code>null</code> namespace. This test makes sure that this is
+	 * prohibited now.
+	 */
+	@Test
+	public void testRequireNonNullNamespace() throws Exception {
+		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		try {
+			backend.getPartitionedState(null, VoidNamespaceSerializer.INSTANCE, kvId);
+			fail("Did not throw expected NullPointerException");
+		} catch (NullPointerException ignored) {
+		}
+
+		try {
+			backend.getPartitionedState(VoidNamespace.INSTANCE, null, kvId);
+			fail("Did not throw expected NullPointerException");
+		} catch (NullPointerException ignored) {
+		}
+
+		try {
+			backend.getPartitionedState(null, null, kvId);
+			fail("Did not throw expected NullPointerException");
+		} catch (NullPointerException ignored) {
+		}
+	}
+
+	/**
+	 * Tests that {@link AbstractHeapState} instances respect the queryable
+	 * flag and create concurrent variants for internal state structures.
+	 */
+	@SuppressWarnings("unchecked")
+	protected static <B extends AbstractStateBackend> void testConcurrentMapIfQueryable(B backend) throws Exception {
+		{
+			// ValueState
+			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>(
+					"value-state",
+					Integer.class,
+					-1);
+			desc.setQueryable("my-query");
+			desc.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			ValueState<Integer> state = backend.getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					desc);
+
+			KvState<Integer, VoidNamespace, ?, ?, ?> kvState = (KvState<Integer, VoidNamespace, ?, ?, ?>) state;
+			assertTrue(kvState instanceof AbstractHeapState);
+
+			Map<VoidNamespace, Map<Integer, ?>> stateMap = ((AbstractHeapState) kvState).getStateMap();
+			assertTrue(stateMap instanceof ConcurrentHashMap);
+
+			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+			kvState.setCurrentKey(1);
+			state.update(121818273);
+
+			Map<Integer, ?> namespaceMap = stateMap.get(VoidNamespace.INSTANCE);
+
+			assertNotNull("Value not set", namespaceMap);
+			assertTrue(namespaceMap instanceof ConcurrentHashMap);
+		}
+
+		{
+			// ListState
+			ListStateDescriptor<Integer> desc = new ListStateDescriptor<>("list-state", Integer.class);
+			desc.setQueryable("my-query");
+			desc.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			ListState<Integer> state = backend.getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					desc);
+
+			KvState<Integer, VoidNamespace, ?, ?, ?> kvState = (KvState<Integer, VoidNamespace, ?, ?, ?>) state;
+			assertTrue(kvState instanceof AbstractHeapState);
+
+			Map<VoidNamespace, Map<Integer, ?>> stateMap = ((AbstractHeapState) kvState).getStateMap();
+			assertTrue(stateMap instanceof ConcurrentHashMap);
+
+			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+			kvState.setCurrentKey(1);
+			state.add(121818273);
+
+			Map<Integer, ?> namespaceMap = stateMap.get(VoidNamespace.INSTANCE);
+
+			assertNotNull("List not set", namespaceMap);
+			assertTrue(namespaceMap instanceof ConcurrentHashMap);
+		}
+
+		{
+			// ReducingState
+			ReducingStateDescriptor<Integer> desc = new ReducingStateDescriptor<>(
+					"reducing-state", new ReduceFunction<Integer>() {
+				@Override
+				public Integer reduce(Integer value1, Integer value2) throws Exception {
+					return value1 + value2;
+				}
+			}, Integer.class);
+			desc.setQueryable("my-query");
+			desc.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			ReducingState<Integer> state = backend.getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					desc);
+
+			KvState<Integer, VoidNamespace, ?, ?, ?> kvState = (KvState<Integer, VoidNamespace, ?, ?, ?>) state;
+			assertTrue(kvState instanceof AbstractHeapState);
+
+			Map<VoidNamespace, Map<Integer, ?>> stateMap = ((AbstractHeapState) kvState).getStateMap();
+			assertTrue(stateMap instanceof ConcurrentHashMap);
+
+			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+			kvState.setCurrentKey(1);
+			state.add(121818273);
+
+			Map<Integer, ?> namespaceMap = stateMap.get(VoidNamespace.INSTANCE);
+
+			assertNotNull("List not set", namespaceMap);
+			assertTrue(namespaceMap instanceof ConcurrentHashMap);
+		}
+
+		{
+			// FoldingState
+			FoldingStateDescriptor<Integer, Integer> desc = new FoldingStateDescriptor<>(
+					"folding-state", 0, new FoldFunction<Integer, Integer>() {
+				@Override
+				public Integer fold(Integer accumulator, Integer value) throws Exception {
+					return accumulator + value;
+				}
+			}, Integer.class);
+			desc.setQueryable("my-query");
+			desc.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			FoldingState<Integer, Integer> state = backend.getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					desc);
+
+			KvState<Integer, VoidNamespace, ?, ?, ?> kvState = (KvState<Integer, VoidNamespace, ?, ?, ?>) state;
+			assertTrue(kvState instanceof AbstractHeapState);
+
+			Map<VoidNamespace, Map<Integer, ?>> stateMap = ((AbstractHeapState) kvState).getStateMap();
+			assertTrue(stateMap instanceof ConcurrentHashMap);
+
+			kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+			kvState.setCurrentKey(1);
+			state.add(121818273);
+
+			Map<Integer, ?> namespaceMap = stateMap.get(VoidNamespace.INSTANCE);
+
+			assertNotNull("List not set", namespaceMap);
+			assertTrue(namespaceMap instanceof ConcurrentHashMap);
+		}
+	}
+
+	/**
+	 * Tests registration with the KvStateRegistry.
+	 */
+	@Test
+	public void testQueryableStateRegistration() throws Exception {
+		DummyEnvironment env = new DummyEnvironment("test", 1, 0);
+		KvStateRegistry registry = env.getKvStateRegistry();
+
+		KvStateRegistryListener listener = mock(KvStateRegistryListener.class);
+		registry.registerListener(listener);
+
+		backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>(
+				"test",
+				IntSerializer.INSTANCE,
+				null);
+		desc.setQueryable("banana");
+
+		backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
+
+		// Verify registered
+		verify(listener, times(1)).notifyKvStateRegistered(
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+
+
+		HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot = backend
+				.snapshotPartitionedState(682375462379L, 4);
+
+		for (String key: snapshot.keySet()) {
+			if (snapshot.get(key) instanceof AsynchronousKvStateSnapshot) {
+				snapshot.put(key, ((AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) snapshot.get(key)).materialize());
+			}
+		}
+
+		// Verify unregistered
+		backend.dispose();
+
+		verify(listener, times(1)).notifyKvStateUnregistered(
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"));
+
+		// Initialize again
+		backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+
+		backend.injectKeyValueStateSnapshots((HashMap) snapshot);
+
+		backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
+
+		// Verify registered again
+		verify(listener, times(2)).notifyKvStateRegistered(
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+
+
+	}
+
 	private static class AppendingReduce implements ReduceFunction<String> {
 		@Override
 		public String reduce(String value1, String value2) throws Exception {
@@ -744,4 +1064,52 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			return acc + "," + value;
 		}
 	}
+
+	/**
+	 * Returns the value by getting the serialized value and deserializing it
+	 * if it is not null.
+	 */
+	private static <V, K, N> V getSerializedValue(
+			KvState<K, N, ?, ?, ?> kvState,
+			K key,
+			TypeSerializer<K> keySerializer,
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> valueSerializer) throws Exception {
+
+		byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
+				key, keySerializer, namespace, namespaceSerializer);
+
+		byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+
+		if (serializedValue == null) {
+			return null;
+		} else {
+			return KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
+		}
+	}
+
+	/**
+	 * Returns the value by getting the serialized value and deserializing it
+	 * if it is not null.
+	 */
+	private static <V, K, N> List<V> getSerializedList(
+			KvState<K, N, ?, ?, ?> kvState,
+			K key,
+			TypeSerializer<K> keySerializer,
+			N namespace,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> valueSerializer) throws Exception {
+
+		byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
+				key, keySerializer, namespace, namespaceSerializer);
+
+		byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
+
+		if (serializedValue == null) {
+			return null;
+		} else {
+			return KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a909adbf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 0269a34..15bb384 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -23,19 +23,19 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -189,7 +189,6 @@ public abstract class AbstractStreamOperator<OUT>
 			}
 		}
 
-
 		return state;
 	}
 	
@@ -271,7 +270,7 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
 	 */
 	protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
-		return getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, stateDescriptor);
+		return getStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a909adbf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 2434843..98bb303 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,6 +44,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -555,7 +556,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
 			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
-			ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+			ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
 
 			mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
 			mergeState.clear();
@@ -863,7 +864,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
 			for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
 				setKeyContext(key.getKey());
-				ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+				ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
 				mergeState.clear();
 				key.getValue().persist(mergeState);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a909adbf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 30ebb20..b2f6dbd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -29,14 +29,13 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
-
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemListState;
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -44,8 +43,12 @@ import java.util.Collections;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class StreamingRuntimeContextTest {
 	
@@ -179,8 +182,10 @@ public class StreamingRuntimeContextTest {
 					public ListState<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
 						ListStateDescriptor<String> descr =
 							(ListStateDescriptor<String>) invocationOnMock.getArguments()[0];
-						return new MemListState<String, Void, String>(
-								StringSerializer.INSTANCE, VoidSerializer.INSTANCE, descr);
+						MemListState<String, VoidNamespace, String> listState = new MemListState<>(
+								StringSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE, descr);
+						listState.setCurrentNamespace(VoidNamespace.INSTANCE);
+						return listState;
 					}
 				});