You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:05 UTC

[05/14] flink git commit: [FLINK-7769][QS] Move queryable state outside the runtime.

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
new file mode 100644
index 0000000..c37c822
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
+import org.apache.flink.queryablestate.UnknownKvStateID;
+import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.ChunkedByteBuf;
+import org.apache.flink.queryablestate.server.KvStateServerHandler;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link KvStateServerHandler}.
+ */
+public class KvStateServerHandlerTest extends TestLogger {
+
+	/** Shared Thread pool for query execution. */
+	private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor();
+
+	private static final int READ_TIMEOUT_MILLIS = 10000;
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (TEST_THREAD_POOL != null) {
+			TEST_THREAD_POOL.shutdown();
+		}
+	}
+
+	/**
+	 * Tests a simple successful query via an EmbeddedChannel.
+	 */
+	@Test
+	public void testSimpleQuery() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		// Register state
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+		desc.setQueryable("vanilla");
+
+		int numKeyGroups = 1;
+		AbstractStateBackend abstractBackend = new MemoryStateBackend();
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+		dummyEnv.setKvStateRegistry(registry);
+		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+				dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+		final TestRegistryListener registryListener = new TestRegistryListener();
+		registry.registerListener(registryListener);
+
+		// Update the KvState and request it
+		int expectedValue = 712828289;
+
+		int key = 99812822;
+		backend.setCurrentKey(key);
+		ValueState<Integer> state = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE,
+				desc);
+
+		state.update(expectedValue);
+
+		byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+				key,
+				IntSerializer.INSTANCE,
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE);
+
+		long requestId = Integer.MAX_VALUE + 182828L;
+
+		assertTrue(registryListener.registrationName.equals("vanilla"));
+
+		ByteBuf request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				requestId,
+				registryListener.kvStateId,
+				serializedKeyAndNamespace);
+
+		// Write the request and wait for the response
+		channel.writeInbound(request);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf);
+
+		assertEquals(requestId, response.getRequestId());
+
+		int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE);
+		assertEquals(expectedValue, actualValue);
+
+		assertEquals(stats.toString(), 1, stats.getNumRequests());
+
+		// Wait for async successful request report
+		long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+		while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) {
+			Thread.sleep(10);
+		}
+
+		assertEquals(stats.toString(), 1, stats.getNumSuccessful());
+	}
+
+	/**
+	 * Tests the failure response with {@link UnknownKvStateID} as cause on
+	 * queries for unregistered KvStateIDs.
+	 */
+	@Test
+	public void testQueryUnknownKvStateID() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		long requestId = Integer.MAX_VALUE + 182828L;
+		ByteBuf request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				requestId,
+				new KvStateID(),
+				new byte[0]);
+
+		// Write the request and wait for the response
+		channel.writeInbound(request);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+		assertEquals(requestId, response.getRequestId());
+
+		assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateID);
+
+		assertEquals(1, stats.getNumRequests());
+		assertEquals(1, stats.getNumFailed());
+	}
+
+	/**
+	 * Tests the failure response with {@link UnknownKeyOrNamespace} as cause
+	 * on queries for non-existing keys.
+	 */
+	@Test
+	public void testQueryUnknownKey() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		int numKeyGroups = 1;
+		AbstractStateBackend abstractBackend = new MemoryStateBackend();
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+		dummyEnv.setKvStateRegistry(registry);
+		KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+				dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+		final TestRegistryListener registryListener = new TestRegistryListener();
+		registry.registerListener(registryListener);
+
+		// Register state
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+		desc.setQueryable("vanilla");
+
+		backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
+
+		byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+				1238283,
+				IntSerializer.INSTANCE,
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE);
+
+		long requestId = Integer.MAX_VALUE + 22982L;
+
+		assertTrue(registryListener.registrationName.equals("vanilla"));
+
+		ByteBuf request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				requestId,
+				registryListener.kvStateId,
+				serializedKeyAndNamespace);
+
+		// Write the request and wait for the response
+		channel.writeInbound(request);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+		assertEquals(requestId, response.getRequestId());
+
+		assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespace);
+
+		assertEquals(1, stats.getNumRequests());
+		assertEquals(1, stats.getNumFailed());
+	}
+
+	/**
+	 * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])}
+	 * call.
+	 */
+	@Test
+	public void testFailureOnGetSerializedValue() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		// Failing KvState
+		InternalKvState<?> kvState = mock(InternalKvState.class);
+		when(kvState.getSerializedValue(any(byte[].class)))
+				.thenThrow(new RuntimeException("Expected test Exception"));
+
+		KvStateID kvStateId = registry.registerKvState(
+				new JobID(),
+				new JobVertexID(),
+				new KeyGroupRange(0, 0),
+				"vanilla",
+				kvState);
+
+		ByteBuf request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				282872,
+				kvStateId,
+				new byte[0]);
+
+		// Write the request and wait for the response
+		channel.writeInbound(request);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+		assertTrue(response.getCause().getMessage().contains("Expected test Exception"));
+
+		assertEquals(1, stats.getNumRequests());
+		assertEquals(1, stats.getNumFailed());
+	}
+
+	/**
+	 * Tests that the channel is closed if an Exception reaches the channel
+	 * handler.
+	 */
+	@Test
+	public void testCloseChannelOnExceptionCaught() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+		channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+		Throwable response = MessageSerializer.deserializeServerFailure(buf);
+
+		assertTrue(response.getMessage().contains("Expected test Exception"));
+
+		channel.closeFuture().await(READ_TIMEOUT_MILLIS);
+		assertFalse(channel.isActive());
+	}
+
+	/**
+	 * Tests the failure response on a rejected execution, because the query
+	 * executor has been closed.
+	 */
+	@Test
+	public void testQueryExecutorShutDown() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		ExecutorService closedExecutor = Executors.newSingleThreadExecutor();
+		closedExecutor.shutdown();
+		assertTrue(closedExecutor.isShutdown());
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		int numKeyGroups = 1;
+		AbstractStateBackend abstractBackend = new MemoryStateBackend();
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+		dummyEnv.setKvStateRegistry(registry);
+		KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+				dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+		final TestRegistryListener registryListener = new TestRegistryListener();
+		registry.registerListener(registryListener);
+
+		// Register state
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+		desc.setQueryable("vanilla");
+
+		backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
+
+		assertTrue(registryListener.registrationName.equals("vanilla"));
+
+		ByteBuf request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				282872,
+				registryListener.kvStateId,
+				new byte[0]);
+
+		// Write the request and wait for the response
+		channel.writeInbound(request);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+		assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
+
+		assertEquals(1, stats.getNumRequests());
+		assertEquals(1, stats.getNumFailed());
+	}
+
+	/**
+	 * Tests response on unexpected messages.
+	 */
+	@Test
+	public void testUnexpectedMessage() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		// Write the request and wait for the response
+		ByteBuf unexpectedMessage = Unpooled.buffer(8);
+		unexpectedMessage.writeInt(4);
+		unexpectedMessage.writeInt(123238213);
+
+		channel.writeInbound(unexpectedMessage);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+		Throwable response = MessageSerializer.deserializeServerFailure(buf);
+
+		assertEquals(0, stats.getNumRequests());
+		assertEquals(0, stats.getNumFailed());
+
+		unexpectedMessage = MessageSerializer.serializeKvStateRequestResult(
+				channel.alloc(),
+				192,
+				new byte[0]);
+
+		channel.writeInbound(unexpectedMessage);
+
+		buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+		response = MessageSerializer.deserializeServerFailure(buf);
+
+		assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException);
+
+		assertEquals(0, stats.getNumRequests());
+		assertEquals(0, stats.getNumFailed());
+	}
+
+	/**
+	 * Tests that incoming buffer instances are recycled.
+	 */
+	@Test
+	public void testIncomingBufferIsRecycled() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		ByteBuf request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				282872,
+				new KvStateID(),
+				new byte[0]);
+
+		assertEquals(1, request.refCnt());
+
+		// Write regular request
+		channel.writeInbound(request);
+		assertEquals("Buffer not recycled", 0, request.refCnt());
+
+		// Write unexpected msg
+		ByteBuf unexpected = channel.alloc().buffer(8);
+		unexpected.writeInt(4);
+		unexpected.writeInt(4);
+
+		assertEquals(1, unexpected.refCnt());
+
+		channel.writeInbound(unexpected);
+		assertEquals("Buffer not recycled", 0, unexpected.refCnt());
+	}
+
+	/**
+	 * Tests the failure response if the serializers don't match.
+	 */
+	@Test
+	public void testSerializerMismatch() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		int numKeyGroups = 1;
+		AbstractStateBackend abstractBackend = new MemoryStateBackend();
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+		dummyEnv.setKvStateRegistry(registry);
+		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+				dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+		final TestRegistryListener registryListener = new TestRegistryListener();
+		registry.registerListener(registryListener);
+
+		// Register state
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+		desc.setQueryable("vanilla");
+
+		ValueState<Integer> state = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE,
+				desc);
+
+		int key = 99812822;
+
+		// Update the KvState
+		backend.setCurrentKey(key);
+		state.update(712828289);
+
+		byte[] wrongKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+				"wrong-key-type",
+				StringSerializer.INSTANCE,
+				"wrong-namespace-type",
+				StringSerializer.INSTANCE);
+
+		byte[] wrongNamespace = KvStateSerializer.serializeKeyAndNamespace(
+				key,
+				IntSerializer.INSTANCE,
+				"wrong-namespace-type",
+				StringSerializer.INSTANCE);
+
+		assertTrue(registryListener.registrationName.equals("vanilla"));
+		ByteBuf request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				182828,
+				registryListener.kvStateId,
+				wrongKeyAndNamespace);
+
+		// Write the request and wait for the response
+		channel.writeInbound(request);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+		assertEquals(182828, response.getRequestId());
+		assertTrue(response.getCause().getMessage().contains("IOException"));
+
+		// Repeat with wrong namespace only
+		request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				182829,
+				registryListener.kvStateId,
+				wrongNamespace);
+
+		// Write the request and wait for the response
+		channel.writeInbound(request);
+
+		buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+		assertEquals(182829, response.getRequestId());
+		assertTrue(response.getCause().getMessage().contains("IOException"));
+
+		assertEquals(2, stats.getNumRequests());
+		assertEquals(2, stats.getNumFailed());
+	}
+
+	/**
+	 * Tests that large responses are chunked.
+	 */
+	@Test
+	public void testChunkedResponse() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		KvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		int numKeyGroups = 1;
+		AbstractStateBackend abstractBackend = new MemoryStateBackend();
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+		dummyEnv.setKvStateRegistry(registry);
+		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+				dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+		final TestRegistryListener registryListener = new TestRegistryListener();
+		registry.registerListener(registryListener);
+
+		// Register state
+		ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE);
+		desc.setQueryable("vanilla");
+
+		ValueState<byte[]> state = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE,
+				desc);
+
+		// Update KvState
+		byte[] bytes = new byte[2 * channel.config().getWriteBufferHighWaterMark()];
+
+		byte current = 0;
+		for (int i = 0; i < bytes.length; i++) {
+			bytes[i] = current++;
+		}
+
+		int key = 99812822;
+		backend.setCurrentKey(key);
+		state.update(bytes);
+
+		// Request
+		byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+				key,
+				IntSerializer.INSTANCE,
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE);
+
+		long requestId = Integer.MAX_VALUE + 182828L;
+
+		assertTrue(registryListener.registrationName.equals("vanilla"));
+
+		ByteBuf request = MessageSerializer.serializeKvStateRequest(
+				channel.alloc(),
+				requestId,
+				registryListener.kvStateId,
+				serializedKeyAndNamespace);
+
+		// Write the request and wait for the response
+		channel.writeInbound(request);
+
+		Object msg = readInboundBlocking(channel);
+		assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Queries the embedded channel for data.
+	 */
+	private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException {
+		final int sleepMillis = 50;
+
+		int sleptMillis = 0;
+
+		Object msg = null;
+		while (sleptMillis < READ_TIMEOUT_MILLIS &&
+				(msg = channel.readOutbound()) == null) {
+
+			Thread.sleep(sleepMillis);
+			sleptMillis += sleepMillis;
+		}
+
+		if (msg == null) {
+			throw new TimeoutException();
+		} else {
+			return msg;
+		}
+	}
+
+	/**
+	 * Frame length decoder (expected by the serialized messages).
+	 */
+	private ChannelHandler getFrameDecoder() {
+		return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
+	}
+
+	/**
+	 * A listener that keeps the last updated KvState information so that a test
+	 * can retrieve it.
+	 */
+	static class TestRegistryListener implements KvStateRegistryListener {
+		volatile JobVertexID jobVertexID;
+		volatile KeyGroupRange keyGroupIndex;
+		volatile String registrationName;
+		volatile KvStateID kvStateId;
+
+		@Override
+		public void notifyKvStateRegistered(JobID jobId,
+				JobVertexID jobVertexId,
+				KeyGroupRange keyGroupRange,
+				String registrationName,
+				KvStateID kvStateId) {
+			this.jobVertexID = jobVertexId;
+			this.keyGroupIndex = keyGroupRange;
+			this.registrationName = registrationName;
+			this.kvStateId = kvStateId;
+		}
+
+		@Override
+		public void notifyKvStateUnregistered(JobID jobId,
+				JobVertexID jobVertexId,
+				KeyGroupRange keyGroupRange,
+				String registrationName) {
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
new file mode 100644
index 0000000..9332e68
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link KvStateServer}.
+ */
+public class KvStateServerTest {
+
+	// Thread pool for client bootstrap (shared between tests)
+	private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+
+	private static final int TIMEOUT_MILLIS = 10000;
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (NIO_GROUP != null) {
+			NIO_GROUP.shutdownGracefully();
+		}
+	}
+
+	/**
+	 * Tests a simple successful query via a SocketChannel.
+	 */
+	@Test
+	public void testSimpleRequest() throws Exception {
+		KvStateServer server = null;
+		Bootstrap bootstrap = null;
+		try {
+			KvStateRegistry registry = new KvStateRegistry();
+			KvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+			server = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats);
+			server.start();
+
+			KvStateServerAddress serverAddress = server.getAddress();
+			int numKeyGroups = 1;
+			AbstractStateBackend abstractBackend = new MemoryStateBackend();
+			DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+			dummyEnv.setKvStateRegistry(registry);
+			AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+					dummyEnv,
+					new JobID(),
+					"test_op",
+					IntSerializer.INSTANCE,
+					numKeyGroups,
+					new KeyGroupRange(0, 0),
+					registry.createTaskRegistry(new JobID(), new JobVertexID()));
+
+			final KvStateServerHandlerTest.TestRegistryListener registryListener =
+					new KvStateServerHandlerTest.TestRegistryListener();
+
+			registry.registerListener(registryListener);
+
+			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+			desc.setQueryable("vanilla");
+
+			ValueState<Integer> state = backend.getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					desc);
+
+			// Update KvState
+			int expectedValue = 712828289;
+
+			int key = 99812822;
+			backend.setCurrentKey(key);
+			state.update(expectedValue);
+
+			// Request
+			byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+					key,
+					IntSerializer.INSTANCE,
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE);
+
+			// Connect to the server
+			final BlockingQueue<ByteBuf> responses = new LinkedBlockingQueue<>();
+			bootstrap = createBootstrap(
+					new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
+					new ChannelInboundHandlerAdapter() {
+						@Override
+						public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+							responses.add((ByteBuf) msg);
+						}
+					});
+
+			Channel channel = bootstrap
+					.connect(serverAddress.getHost(), serverAddress.getPort())
+					.sync().channel();
+
+			long requestId = Integer.MAX_VALUE + 182828L;
+
+			assertTrue(registryListener.registrationName.equals("vanilla"));
+			ByteBuf request = MessageSerializer.serializeKvStateRequest(
+					channel.alloc(),
+					requestId,
+					registryListener.kvStateId,
+					serializedKeyAndNamespace);
+
+			channel.writeAndFlush(request);
+
+			ByteBuf buf = responses.poll(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+			assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+			KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf);
+
+			assertEquals(requestId, response.getRequestId());
+			int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE);
+			assertEquals(expectedValue, actualValue);
+		} finally {
+			if (server != null) {
+				server.shutDown();
+			}
+
+			if (bootstrap != null) {
+				EventLoopGroup group = bootstrap.group();
+				if (group != null) {
+					group.shutdownGracefully();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Creates a client bootstrap.
+	 */
+	private Bootstrap createBootstrap(final ChannelHandler... handlers) {
+		return new Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class)
+				.handler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					protected void initChannel(SocketChannel ch) throws Exception {
+						ch.pipeline().addLast(handlers);
+					}
+				});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
new file mode 100644
index 0000000..5d4a861
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownKvStateID;
+import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation;
+import org.apache.flink.queryablestate.client.KvStateClient;
+import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapValueState;
+import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.MathUtils;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link QueryableStateClient}.
+ */
+public class QueryableStateClientTest {
+
+	private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+	private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (testActorSystem != null) {
+			testActorSystem.shutdown();
+		}
+	}
+
+	/**
+	 * All failures should lead to a retry with a forced location lookup.
+	 *
+	 * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, UnknownKvStateLocation,
+	 * ConnectException are checked explicitly as these indicate out-of-sync
+	 * KvStateLocation.
+	 */
+	@Test
+	public void testForceLookupOnOutdatedLocation() throws Exception {
+		KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+		KvStateClient networkClient = mock(KvStateClient.class);
+
+		QueryableStateClient client = new QueryableStateClient(
+				lookupService,
+				networkClient,
+				testActorSystem.dispatcher());
+
+		try {
+			JobID jobId = new JobID();
+			int numKeyGroups = 4;
+
+			//
+			// UnknownKvStateLocation
+			//
+			String query1 = "lucky";
+
+			Future<KvStateLocation> unknownKvStateLocation = Futures.failed(
+					new UnknownKvStateLocation(query1));
+
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1)))
+					.thenReturn(unknownKvStateLocation);
+
+			Future<Integer> result = client.getKvState(
+					jobId,
+					query1,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+			try {
+				Await.result(result, timeout);
+				fail("Did not throw expected UnknownKvStateLocation exception");
+			} catch (UnknownKvStateLocation ignored) {
+				// Expected
+			}
+
+			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query1));
+
+			//
+			// UnknownKvStateKeyGroupLocation
+			//
+			String query2 = "unlucky";
+
+			Future<KvStateLocation> unknownKeyGroupLocation = Futures.successful(
+					new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query2));
+
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2)))
+					.thenReturn(unknownKeyGroupLocation);
+
+			result = client.getKvState(
+					jobId,
+					query2,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+			try {
+				Await.result(result, timeout);
+				fail("Did not throw expected UnknownKvStateKeyGroupLocation exception");
+			} catch (UnknownKvStateKeyGroupLocation ignored) {
+				// Expected
+			}
+
+			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query2));
+
+			//
+			// UnknownKvStateID
+			//
+			String query3 = "water";
+			KvStateID kvStateId = new KvStateID();
+			Future<byte[]> unknownKvStateId = Futures.failed(new UnknownKvStateID(kvStateId));
+
+			KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323);
+			KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3);
+			for (int i = 0; i < numKeyGroups; i++) {
+				location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
+			}
+
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3)))
+					.thenReturn(Futures.successful(location));
+
+			when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
+					.thenReturn(unknownKvStateId);
+
+			result = client.getKvState(
+					jobId,
+					query3,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+			try {
+				Await.result(result, timeout);
+				fail("Did not throw expected UnknownKvStateID exception");
+			} catch (UnknownKvStateID ignored) {
+				// Expected
+			}
+
+			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query3));
+
+			//
+			// ConnectException
+			//
+			String query4 = "space";
+			Future<byte[]> connectException = Futures.failed(new ConnectException());
+			kvStateId = new KvStateID();
+
+			serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
+			location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4);
+			for (int i = 0; i < numKeyGroups; i++) {
+				location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
+			}
+
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4)))
+					.thenReturn(Futures.successful(location));
+
+			when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
+					.thenReturn(connectException);
+
+			result = client.getKvState(
+					jobId,
+					query4,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+			try {
+				Await.result(result, timeout);
+				fail("Did not throw expected ConnectException exception");
+			} catch (ConnectException ignored) {
+				// Expected
+			}
+
+			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query4));
+
+			//
+			// Other Exceptions don't lead to a retry no retry
+			//
+			String query5 = "universe";
+			Future<KvStateLocation> exception = Futures.failed(new RuntimeException("Test exception"));
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5)))
+					.thenReturn(exception);
+
+			client.getKvState(
+					jobId,
+					query5,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+			verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5));
+		} finally {
+			client.shutDown();
+		}
+	}
+
+	/**
+	 * Tests queries against multiple servers.
+	 *
+	 * <p>The servers are populated with different keys and the client queries
+	 * all available keys from all servers.
+	 */
+	@Test
+	public void testIntegrationWithKvStateServer() throws Exception {
+		// Config
+		int numServers = 2;
+		int numKeys = 1024;
+		int numKeyGroups = 1;
+
+		JobID jobId = new JobID();
+		JobVertexID jobVertexId = new JobVertexID();
+
+		KvStateServer[] servers = new KvStateServer[numServers];
+		AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
+
+		QueryableStateClient client = null;
+		KvStateClient networkClient = null;
+		AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats();
+
+		MemoryStateBackend backend = new MemoryStateBackend();
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+
+		AbstractKeyedStateBackend<Integer> keyedStateBackend = backend.createKeyedStateBackend(dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+
+		try {
+			KvStateRegistry[] registries = new KvStateRegistry[numServers];
+			KvStateID[] kvStateIds = new KvStateID[numServers];
+			List<HeapValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>();
+
+			// Start the servers
+			for (int i = 0; i < numServers; i++) {
+				registries[i] = new KvStateRegistry();
+				serverStats[i] = new AtomicKvStateRequestStats();
+				servers[i] = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]);
+				servers[i].start();
+				ValueStateDescriptor<Integer> descriptor =
+						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+
+				RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+						descriptor.getType(),
+						descriptor.getName(),
+						VoidNamespaceSerializer.INSTANCE,
+						IntSerializer.INSTANCE);
+
+				// Register state
+				HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
+						descriptor,
+						new NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
+						IntSerializer.INSTANCE,
+						VoidNamespaceSerializer.INSTANCE);
+
+				kvStates.add(kvState);
+
+				kvStateIds[i] = registries[i].registerKvState(
+						jobId,
+						new JobVertexID(),
+						new KeyGroupRange(i, i),
+						"choco",
+						kvState);
+			}
+
+			int[] expectedRequests = new int[numServers];
+
+			for (int key = 0; key < numKeys; key++) {
+				int targetKeyGroupIndex = MathUtils.murmurHash(key) % numServers;
+				expectedRequests[targetKeyGroupIndex]++;
+
+				HeapValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex);
+
+				keyedStateBackend.setCurrentKey(key);
+				kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+				kvState.update(1337 + key);
+			}
+
+			// Location lookup service
+			KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco");
+			for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) {
+				location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
+			}
+
+			KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq("choco")))
+					.thenReturn(Futures.successful(location));
+
+			// The client
+			networkClient = new KvStateClient(1, networkClientStats);
+
+			client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher());
+
+			// Send all queries
+			List<Future<Integer>> futures = new ArrayList<>(numKeys);
+			for (int key = 0; key < numKeys; key++) {
+				ValueStateDescriptor<Integer> descriptor =
+						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+				futures.add(client.getKvState(
+						jobId,
+						"choco",
+						key,
+						BasicTypeInfo.INT_TYPE_INFO,
+						descriptor));
+			}
+
+			// Verify results
+			Future<Iterable<Integer>> future = Futures.sequence(futures, testActorSystem.dispatcher());
+			Iterable<Integer> results = Await.result(future, timeout);
+
+			int index = 0;
+			for (int buffer : results) {
+				assertEquals(1337 + index, buffer);
+				index++;
+			}
+
+			// Verify requests
+			for (int i = 0; i < numServers; i++) {
+				int numRetries = 10;
+				for (int retry = 0; retry < numRetries; retry++) {
+					try {
+						assertEquals("Unexpected number of requests", expectedRequests[i], serverStats[i].getNumRequests());
+						assertEquals("Unexpected success requests", expectedRequests[i], serverStats[i].getNumSuccessful());
+						assertEquals("Unexpected failed requests", 0, serverStats[i].getNumFailed());
+						break;
+					} catch (Throwable t) {
+						// Retry
+						if (retry == numRetries - 1) {
+							throw t;
+						} else {
+							Thread.sleep(100);
+						}
+					}
+				}
+			}
+		} finally {
+			if (client != null) {
+				client.shutDown();
+			}
+
+			if (networkClient != null) {
+				networkClient.shutDown();
+			}
+
+			for (KvStateServer server : servers) {
+				if (server != null) {
+					server.shutDown();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Tests that the QueryableState client correctly caches location lookups
+	 * keyed by both job and name. This test is mainly due to a previous bug due
+	 * to which cache entries were by name only. This is a problem, because the
+	 * same client can be used to query multiple jobs.
+	 */
+	@Test
+	public void testLookupMultipleJobIds() throws Exception {
+		String name = "unique-per-job";
+
+		// Exact contents don't matter here
+		KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name);
+		location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
+
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+
+		KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+
+		when(lookupService.getKvStateLookupInfo(any(JobID.class), anyString()))
+				.thenReturn(Futures.successful(location));
+
+		KvStateClient networkClient = mock(KvStateClient.class);
+		when(networkClient.getKvState(any(KvStateServerAddress.class), any(KvStateID.class), any(byte[].class)))
+				.thenReturn(Futures.successful(new byte[0]));
+
+		QueryableStateClient client = new QueryableStateClient(
+				lookupService,
+				networkClient,
+				testActorSystem.dispatcher());
+
+		ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
+
+		// Query ies with same name, but different job IDs should lead to a
+		// single lookup per query and job ID.
+		client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc);
+		client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc);
+
+		verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name));
+		verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..10792cd
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/pom.xml b/flink-queryable-state/pom.xml
new file mode 100644
index 0000000..e9e7496
--- /dev/null
+++ b/flink-queryable-state/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-parent</artifactId>
+        <version>1.4-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-queryable-state</artifactId>
+    <name>flink-queryable-state</name>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>flink-queryable-state-java</module>
+       <!-- <module>flink-state-client-scala</module>-->
+    </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 9193859..53503ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
deleted file mode 100644
index a37a3ac..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.util.Preconditions;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * Akka-based {@link KvStateLocationLookupService} that retrieves the current
- * JobManager address and uses it for lookups.
- */
-class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);
-
-	/** Future returned when no JobManager is available. */
-	private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager());
-
-	/** Leader retrieval service to retrieve the current job manager. */
-	private final LeaderRetrievalService leaderRetrievalService;
-
-	/** The actor system used to resolve the JobManager address. */
-	private final ActorSystem actorSystem;
-
-	/** Timeout for JobManager ask-requests. */
-	private final FiniteDuration askTimeout;
-
-	/** Retry strategy factory on future failures. */
-	private final LookupRetryStrategyFactory retryStrategyFactory;
-
-	/** Current job manager future. */
-	private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
-
-	/**
-	 * Creates the Akka-based {@link KvStateLocationLookupService}.
-	 *
-	 * @param leaderRetrievalService Leader retrieval service to use.
-	 * @param actorSystem            Actor system to use.
-	 * @param askTimeout             Timeout for JobManager ask-requests.
-	 * @param retryStrategyFactory   Retry strategy if no JobManager available.
-	 */
-	AkkaKvStateLocationLookupService(
-			LeaderRetrievalService leaderRetrievalService,
-			ActorSystem actorSystem,
-			FiniteDuration askTimeout,
-			LookupRetryStrategyFactory retryStrategyFactory) {
-
-		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
-		this.actorSystem = Preconditions.checkNotNull(actorSystem, "Actor system");
-		this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask Timeout");
-		this.retryStrategyFactory = Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
-	}
-
-	public void start() {
-		try {
-			leaderRetrievalService.start(this);
-		} catch (Exception e) {
-			LOG.error("Failed to start leader retrieval service", e);
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void shutDown() {
-		try {
-			leaderRetrievalService.stop();
-		} catch (Exception e) {
-			LOG.error("Failed to stop leader retrieval service", e);
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName) {
-		return getKvStateLookupInfo(jobId, registrationName, retryStrategyFactory.createRetryStrategy());
-	}
-
-	/**
-	 * Returns a future holding the {@link KvStateLocation} for the given job
-	 * and KvState registration name.
-	 *
-	 * <p>If there is currently no JobManager registered with the service, the
-	 * request is retried. The retry behaviour is specified by the
-	 * {@link LookupRetryStrategy} of the lookup service.
-	 *
-	 * @param jobId               JobID the KvState instance belongs to
-	 * @param registrationName    Name under which the KvState has been registered
-	 * @param lookupRetryStrategy Retry strategy to use for retries on UnknownJobManager failures.
-	 * @return Future holding the {@link KvStateLocation}
-	 */
-	@SuppressWarnings("unchecked")
-	private Future<KvStateLocation> getKvStateLookupInfo(
-			final JobID jobId,
-			final String registrationName,
-			final LookupRetryStrategy lookupRetryStrategy) {
-
-		return jobManagerFuture
-				.flatMap(new Mapper<ActorGateway, Future<Object>>() {
-					@Override
-					public Future<Object> apply(ActorGateway jobManager) {
-						// Lookup the KvStateLocation
-						Object msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName);
-						return jobManager.ask(msg, askTimeout);
-					}
-				}, actorSystem.dispatcher())
-				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
-				.recoverWith(new Recover<Future<KvStateLocation>>() {
-					@Override
-					public Future<KvStateLocation> recover(Throwable failure) throws Throwable {
-						// If the Future fails with UnknownJobManager, retry
-						// the request. Otherwise all Futures will be failed
-						// during the start up phase, when the JobManager did
-						// not notify this service yet or leadership is lost
-						// intermittently.
-						if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) {
-							return Patterns.after(
-									lookupRetryStrategy.getRetryDelay(),
-									actorSystem.scheduler(),
-									actorSystem.dispatcher(),
-									new Callable<Future<KvStateLocation>>() {
-										@Override
-										public Future<KvStateLocation> call() throws Exception {
-											return getKvStateLookupInfo(
-													jobId,
-													registrationName,
-													lookupRetryStrategy);
-										}
-									});
-						} else {
-							return Futures.failed(failure);
-						}
-					}
-				}, actorSystem.dispatcher());
-	}
-
-	@Override
-	public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Received leader address notification {}:{}", leaderAddress, leaderSessionID);
-		}
-
-		if (leaderAddress == null) {
-			jobManagerFuture = UNKNOWN_JOB_MANAGER;
-		} else {
-			jobManagerFuture = AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
-					.map(new Mapper<ActorRef, ActorGateway>() {
-						@Override
-						public ActorGateway apply(ActorRef actorRef) {
-							return new AkkaActorGateway(actorRef, leaderSessionID);
-						}
-					}, actorSystem.dispatcher());
-		}
-	}
-
-	@Override
-	public void handleError(Exception exception) {
-		jobManagerFuture = Futures.failed(exception);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Retry strategy for failed lookups.
-	 *
-	 * <p>Usage:
-	 * <pre>
-	 * LookupRetryStrategy retryStrategy = LookupRetryStrategyFactory.create();
-	 *
-	 * if (retryStrategy.tryRetry()) {
-	 *     // OK to retry
-	 *     FiniteDuration retryDelay = retryStrategy.getRetryDelay();
-	 * }
-	 * </pre>
-	 */
-	interface LookupRetryStrategy {
-
-		/**
-		 * Returns the current retry.
-		 *
-		 * @return Current retry delay.
-		 */
-		FiniteDuration getRetryDelay();
-
-		/**
-		 * Tries another retry and returns whether it is allowed or not.
-		 *
-		 * @return Whether it is allowed to do another restart or not.
-		 */
-		boolean tryRetry();
-
-	}
-
-	/**
-	 * Factory for retry strategies.
-	 */
-	interface LookupRetryStrategyFactory {
-
-		/**
-		 * Creates a new retry strategy.
-		 *
-		 * @return The retry strategy.
-		 */
-		LookupRetryStrategy createRetryStrategy();
-
-	}
-
-	/**
-	 * Factory for disabled retries.
-	 */
-	static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
-
-		private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy();
-
-		@Override
-		public LookupRetryStrategy createRetryStrategy() {
-			return RETRY_STRATEGY;
-		}
-
-		private static class DisabledLookupRetryStrategy implements LookupRetryStrategy {
-
-			@Override
-			public FiniteDuration getRetryDelay() {
-				return FiniteDuration.Zero();
-			}
-
-			@Override
-			public boolean tryRetry() {
-				return false;
-			}
-		}
-
-	}
-
-	/**
-	 * Factory for fixed delay retries.
-	 */
-	static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
-
-		private final int maxRetries;
-		private final FiniteDuration retryDelay;
-
-		FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) {
-			this.maxRetries = maxRetries;
-			this.retryDelay = retryDelay;
-		}
-
-		@Override
-		public LookupRetryStrategy createRetryStrategy() {
-			return new FixedDelayLookupRetryStrategy(maxRetries, retryDelay);
-		}
-
-		private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy {
-
-			private final Object retryLock = new Object();
-			private final int maxRetries;
-			private final FiniteDuration retryDelay;
-			private int numRetries;
-
-			public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) {
-				Preconditions.checkArgument(maxRetries >= 0, "Negative number maximum retries");
-				this.maxRetries = maxRetries;
-				this.retryDelay = Preconditions.checkNotNull(retryDelay, "Retry delay");
-			}
-
-			@Override
-			public FiniteDuration getRetryDelay() {
-				synchronized (retryLock) {
-					return retryDelay;
-				}
-			}
-
-			@Override
-			public boolean tryRetry() {
-				synchronized (retryLock) {
-					if (numRetries < maxRetries) {
-						numRetries++;
-						return true;
-					} else {
-						return false;
-					}
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 86d1838..8a213bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -31,7 +31,7 @@ import java.util.Arrays;
  * Location information for all key groups of a {@link InternalKvState} instance.
  *
  * <p>This is populated by the {@link KvStateLocationRegistry} and used by the
- * {@link QueryableStateClient} to target queries.
+ * Queryable State Client to target queries.
  */
 public class KvStateLocation implements Serializable {
 
@@ -166,7 +166,7 @@ public class KvStateLocation implements Serializable {
 	 * @param kvStateAddress Server address of the KvState instance at the key group index.
 	 * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
 	 */
-	void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
+	public void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
 
 		if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
 			throw new IndexOutOfBoundsException("Key group index");
@@ -183,6 +183,10 @@ public class KvStateLocation implements Serializable {
 		}
 	}
 
+	public static long getSerialVersionUID() {
+		return serialVersionUID;
+	}
+
 	/**
 	 * Registers a KvState instance for the given key group index.
 	 *
@@ -190,7 +194,7 @@ public class KvStateLocation implements Serializable {
 	 * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
 	 * @throws IllegalArgumentException  If no location information registered for a key group index in the range.
 	 */
-	void unregisterKvState(KeyGroupRange keyGroupRange) {
+	public void unregisterKvState(KeyGroupRange keyGroupRange) {
 		if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
 			throw new IndexOutOfBoundsException("Key group index");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
deleted file mode 100644
index dfd9c14..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-
-import scala.concurrent.Future;
-
-/**
- * {@link KvStateLocation} lookup service.
- */
-public interface KvStateLocationLookupService {
-
-	/**
-	 * Starts the lookup service.
-	 */
-	void start();
-
-	/**
-	 * Shuts down the lookup service.
-	 */
-	void shutDown();
-
-	/**
-	 * Returns a future holding the {@link KvStateLocation} for the given job
-	 * and KvState registration name.
-	 *
-	 * @param jobId            JobID the KvState instance belongs to
-	 * @param registrationName Name under which the KvState has been registered
-	 * @return Future holding the {@link KvStateLocation}
-	 */
-	Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String registrationName);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 26b700c..90fa5cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
new file mode 100644
index 0000000..9b14c49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+/**
+ * An interface for the Queryable State Server running on each Task Manager in the cluster.
+ * This server is responsible for serving requests coming from the Queryable State Client and
+ * requesting <b>locally</b> stored state.
+ */
+public interface KvStateServer {
+
+	/**
+	 * Returns the address of this server.
+	 *
+	 * @return Server address
+	 */
+	KvStateServerAddress getAddress();
+
+
+	/** Starts the proxy. */
+	void start() throws InterruptedException;
+
+	/**
+	 * Shuts down the server and all related thread pools.
+	 */
+	void shutDown();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
index 9ec25bc..2599855 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.query;
 
-import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -88,4 +87,9 @@ public class KvStateServerAddress implements Serializable {
 		result = 31 * result + port;
 		return result;
 	}
+
+	@Override
+	public String toString() {
+		return hostAddress.getHostName() + ':' + port;
+	}
 }