You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/09 14:47:43 UTC

[09/10] flink git commit: [FLINK-3779] [runtime] Add KvState network client and server

[FLINK-3779] [runtime] Add KvState network client and server

- Adds a Netty-based server and client to query KvState instances, which have
  been published to the KvStateRegistry.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af07eed8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af07eed8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af07eed8

Branch: refs/heads/master
Commit: af07eed8e9bed76e54aab04fccc9c424b1e02fa0
Parents: 63c9b8e
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon May 30 14:00:49 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Aug 9 16:42:05 2016 +0200

----------------------------------------------------------------------
 .../io/network/netty/NettyBufferPool.java       |   2 +-
 .../flink/runtime/jobgraph/JobVertexID.java     |  11 +-
 .../apache/flink/runtime/query/KvStateID.java   |  41 ++
 .../runtime/query/KvStateServerAddress.java     |  87 +++
 .../query/netty/AtomicKvStateRequestStats.java  |  94 +++
 .../runtime/query/netty/ChunkedByteBuf.java     |  97 +++
 .../netty/DisabledKvStateRequestStats.java      |  45 ++
 .../runtime/query/netty/KvStateClient.java      | 575 +++++++++++++++
 .../query/netty/KvStateClientHandler.java       | 104 +++
 .../netty/KvStateClientHandlerCallback.java     |  54 ++
 .../query/netty/KvStateRequestStats.java        |  53 ++
 .../runtime/query/netty/KvStateServer.java      | 243 +++++++
 .../query/netty/KvStateServerHandler.java       | 301 ++++++++
 .../query/netty/UnknownKeyOrNamespace.java      |  31 +
 .../runtime/query/netty/UnknownKvStateID.java   |  35 +
 .../query/netty/message/KvStateRequest.java     |  89 +++
 .../netty/message/KvStateRequestFailure.java    |  68 ++
 .../netty/message/KvStateRequestResult.java     |  74 ++
 .../netty/message/KvStateRequestSerializer.java | 518 +++++++++++++
 .../query/netty/message/KvStateRequestType.java |  40 ++
 .../flink/runtime/query/netty/package-info.java |  80 +++
 .../runtime/util/DataInputDeserializer.java     |   8 +
 .../query/netty/KvStateClientHandlerTest.java   | 110 +++
 .../runtime/query/netty/KvStateClientTest.java  | 718 +++++++++++++++++++
 .../query/netty/KvStateServerHandlerTest.java   | 622 ++++++++++++++++
 .../runtime/query/netty/KvStateServerTest.java  | 174 +++++
 .../message/KvStateRequestSerializerTest.java   | 258 +++++++
 27 files changed, 4525 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
index 6d09f26..4a88b34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
@@ -57,7 +57,7 @@ public class NettyBufferPool implements ByteBufAllocator {
 	 * @param numberOfArenas Number of arenas (recommended: 2 * number of task
 	 *                       slots)
 	 */
-	NettyBufferPool(int numberOfArenas) {
+	public NettyBufferPool(int numberOfArenas) {
 		checkArgument(numberOfArenas >= 1, "Number of arenas");
 		this.numberOfArenas = numberOfArenas;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
index 514aabc..1f78b21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import javax.xml.bind.DatatypeConverter;
-
 import org.apache.flink.util.AbstractID;
 
+import javax.xml.bind.DatatypeConverter;
+
 /**
  * A class for statistically unique job vertex IDs.
  */
@@ -32,15 +32,14 @@ public class JobVertexID extends AbstractID {
 	public JobVertexID() {
 		super();
 	}
+	public JobVertexID(byte[] bytes) {
+		super(bytes);
+	}
 
 	public JobVertexID(long lowerPart, long upperPart) {
 		super(lowerPart, upperPart);
 	}
 
-	public JobVertexID(byte[] bytes) {
-		super(bytes);
-	}
-	
 	public static JobVertexID fromHexString(String hexString) {
 		return new JobVertexID(DatatypeConverter.parseHexBinary(hexString));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
new file mode 100644
index 0000000..bb05833
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Identifier for {@link KvState} instances.
+ *
+ * <p>Assigned when registering state at the {@link KvStateRegistry}.
+ */
+public class KvStateID extends AbstractID {
+
+	private static final long serialVersionUID = 1L;
+
+	public KvStateID() {
+		super();
+	}
+
+	public KvStateID(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
new file mode 100644
index 0000000..7887ed1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+/**
+ * The (host, port)-address of a {@link KvStateServer}.
+ */
+public class KvStateServerAddress implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/** KvStateServer host address. */
+	private final InetAddress hostAddress;
+
+	/** KvStateServer port. */
+	private final int port;
+
+	/**
+	 * Creates a KvStateServerAddress for the given KvStateServer host address
+	 * and port.
+	 *
+	 * @param hostAddress KvStateServer host address
+	 * @param port        KvStateServer port
+	 */
+	public KvStateServerAddress(InetAddress hostAddress, int port) {
+		this.hostAddress = Preconditions.checkNotNull(hostAddress, "Host address");
+		Preconditions.checkArgument(port > 0 && port <= 65535, "Port " + port + " is out of range 1-65535");
+		this.port = port;
+	}
+
+	/**
+	 * Returns the host address of the KvStateServer.
+	 *
+	 * @return KvStateServer host address
+	 */
+	public InetAddress getHost() {
+		return hostAddress;
+	}
+
+	/**
+	 * Returns the port of the KvStateServer.
+	 *
+	 * @return KvStateServer port
+	 */
+	public int getPort() {
+		return port;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) { return true; }
+		if (o == null || getClass() != o.getClass()) { return false; }
+
+		KvStateServerAddress that = (KvStateServerAddress) o;
+
+		return port == that.port && hostAddress.equals(that.hostAddress);
+	}
+
+	@Override
+	public int hashCode() {
+		int result = hostAddress.hashCode();
+		result = 31 * result + port;
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
new file mode 100644
index 0000000..2fca4a8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Atomic {@link KvStateRequestStats} implementation.
+ */
+public class AtomicKvStateRequestStats implements KvStateRequestStats {
+
+	/**
+	 * Number of active connections.
+	 */
+	private final AtomicLong numConnections = new AtomicLong();
+
+	/**
+	 * Total number of reported requests.
+	 */
+	private final AtomicLong numRequests = new AtomicLong();
+
+	/**
+	 * Total number of successful requests (<= reported requests).
+	 */
+	private final AtomicLong numSuccessful = new AtomicLong();
+
+	/**
+	 * Total duration of all successful requests.
+	 */
+	private final AtomicLong successfulDuration = new AtomicLong();
+
+	/**
+	 * Total number of failed requests (<= reported requests).
+	 */
+	private final AtomicLong numFailed = new AtomicLong();
+
+	@Override
+	public void reportActiveConnection() {
+		numConnections.incrementAndGet();
+	}
+
+	@Override
+	public void reportInactiveConnection() {
+		numConnections.decrementAndGet();
+	}
+
+	@Override
+	public void reportRequest() {
+		numRequests.incrementAndGet();
+	}
+
+	@Override
+	public void reportSuccessfulRequest(long durationTotalMillis) {
+		numSuccessful.incrementAndGet();
+		successfulDuration.addAndGet(durationTotalMillis);
+	}
+
+	@Override
+	public void reportFailedRequest() {
+		numFailed.incrementAndGet();
+	}
+
+	public long getNumConnections() {
+		return numConnections.get();
+	}
+
+	public long getNumRequests() {
+		return numRequests.get();
+	}
+
+	public long getNumSuccessful() {
+		return numSuccessful.get();
+	}
+
+	public long getNumFailed() {
+		return numFailed.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
new file mode 100644
index 0000000..6d32489
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedInput;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler},
+ * respecting the high and low watermarks.
+ *
+ * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a>
+ */
+class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
+
+	/** The buffer to chunk */
+	private final ByteBuf buf;
+
+	/** Size of chunks */
+	private final int chunkSize;
+
+	/** Closed flag */
+	private boolean isClosed;
+
+	/** End of input flag */
+	private boolean isEndOfInput;
+
+	public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
+		this.buf = Preconditions.checkNotNull(buf, "Buffer");
+		Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size");
+		this.chunkSize = chunkSize;
+	}
+
+	@Override
+	public boolean isEndOfInput() throws Exception {
+		return isClosed || isEndOfInput;
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (!isClosed) {
+			// If we did not consume the whole buffer yet, we have to release
+			// it here. Otherwise, it's the responsibility of the consumer.
+			if (!isEndOfInput) {
+				buf.release();
+			}
+
+			isClosed = true;
+		}
+	}
+
+	@Override
+	public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+		if (isClosed) {
+			return null;
+		} else if (buf.readableBytes() <= chunkSize) {
+			isEndOfInput = true;
+
+			// Don't retain as the consumer is responsible to release it
+			return buf.slice();
+		} else {
+			// Return a chunk sized slice of the buffer. The ref count is
+			// shared with the original buffer. That's why we need to retain
+			// a reference here.
+			return buf.readSlice(chunkSize).retain();
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "ChunkedByteBuf{" +
+				"buf=" + buf +
+				", chunkSize=" + chunkSize +
+				", isClosed=" + isClosed +
+				", isEndOfInput=" + isEndOfInput +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
new file mode 100644
index 0000000..de8824d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+/**
+ * Disabled {@link KvStateRequestStats} implementation.
+ */
+public class DisabledKvStateRequestStats implements KvStateRequestStats {
+
+	@Override
+	public void reportActiveConnection() {
+	}
+
+	@Override
+	public void reportInactiveConnection() {
+	}
+
+	@Override
+	public void reportRequest() {
+	}
+
+	@Override
+	public void reportSuccessfulRequest(long durationTotalMillis) {
+	}
+
+	@Override
+	public void reportFailedRequest() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
new file mode 100644
index 0000000..6cfe86b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import akka.dispatch.Futures;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Netty-based client querying {@link KvStateServer} instances.
+ *
+ * <p>This client can be used by multiple threads concurrently. Operations are
+ * executed asynchronously and return Futures to their result.
+ *
+ * <p>The incoming pipeline looks as follows:
+ * <pre>
+ * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
+ * </pre>
+ *
+ * <p>Received binary messages are expected to contain a frame length field. Netty's
+ * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before
+ * giving it to our {@link KvStateClientHandler}.
+ *
+ * <p>Connections are established and closed by the client. The server only
+ * closes the connection on a fatal failure that cannot be recovered.
+ */
+public class KvStateClient {
+
+	/** Netty's Bootstrap. */
+	private final Bootstrap bootstrap;
+
+	/** Statistics tracker */
+	private final KvStateRequestStats stats;
+
+	/** Established connections. */
+	private final ConcurrentHashMap<KvStateServerAddress, EstablishedConnection> establishedConnections =
+			new ConcurrentHashMap<>();
+
+	/** Pending connections. */
+	private final ConcurrentHashMap<KvStateServerAddress, PendingConnection> pendingConnections =
+			new ConcurrentHashMap<>();
+
+	/** Atomic shut down flag. */
+	private final AtomicBoolean shutDown = new AtomicBoolean();
+
+	/**
+	 * Creates a client with the specified number of event loop threads.
+	 *
+	 * @param numEventLoopThreads Number of event loop threads (minimum 1).
+	 */
+	public KvStateClient(int numEventLoopThreads, KvStateRequestStats stats) {
+		Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
+		NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
+
+		ThreadFactory threadFactory = new ThreadFactoryBuilder()
+				.setDaemon(true)
+				.setNameFormat("Flink KvStateClient Event Loop Thread %d")
+				.build();
+
+		NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
+
+		this.bootstrap = new Bootstrap()
+				.group(nioGroup)
+				.channel(NioSocketChannel.class)
+				.option(ChannelOption.ALLOCATOR, bufferPool)
+				.handler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					protected void initChannel(SocketChannel ch) throws Exception {
+						ch.pipeline()
+								.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+								// ChunkedWriteHandler respects Channel writability
+								.addLast(new ChunkedWriteHandler());
+					}
+				});
+
+		this.stats = Preconditions.checkNotNull(stats, "Statistics tracker");
+	}
+
+	/**
+	 * Returns a future holding the serialized request result.
+	 *
+	 * <p>If the server does not serve a KvState instance with the given ID,
+	 * the Future will be failed with a {@link UnknownKvStateID}.
+	 *
+	 * <p>If the KvState instance does not hold any data for the given key
+	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
+	 *
+	 * <p>All other failures are forwarded to the Future.
+	 *
+	 * @param serverAddress Address of the server to query
+	 * @param kvStateId ID of the KvState instance to query
+	 * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance with
+	 * @return Future holding the serialized result
+	 */
+	public Future<byte[]> getKvState(
+			KvStateServerAddress serverAddress,
+			KvStateID kvStateId,
+			byte[] serializedKeyAndNamespace) {
+
+		if (shutDown.get()) {
+			return Futures.failed(new IllegalStateException("Shut down"));
+		}
+
+		EstablishedConnection connection = establishedConnections.get(serverAddress);
+
+		if (connection != null) {
+			return connection.getKvState(kvStateId, serializedKeyAndNamespace);
+		} else {
+			PendingConnection pendingConnection = pendingConnections.get(serverAddress);
+			if (pendingConnection != null) {
+				// There was a race, use the existing pending connection.
+				return pendingConnection.getKvState(kvStateId, serializedKeyAndNamespace);
+			} else {
+				// We try to connect to the server.
+				PendingConnection pending = new PendingConnection(serverAddress);
+				PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending);
+
+				if (previous == null) {
+					// OK, we are responsible to connect.
+					bootstrap.connect(serverAddress.getHost(), serverAddress.getPort())
+							.addListener(pending);
+
+					return pending.getKvState(kvStateId, serializedKeyAndNamespace);
+				} else {
+					// There was a race, use the existing pending connection.
+					return previous.getKvState(kvStateId, serializedKeyAndNamespace);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Shuts down the client and closes all connections.
+	 *
+	 * <p>After a call to this method, all returned futures will be failed.
+	 */
+	public void shutDown() {
+		if (shutDown.compareAndSet(false, true)) {
+			for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
+				if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
+					conn.getValue().close();
+				}
+			}
+
+			for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) {
+				if (pendingConnections.remove(conn.getKey()) != null) {
+					conn.getValue().close();
+				}
+			}
+
+			if (bootstrap != null) {
+				EventLoopGroup group = bootstrap.group();
+				if (group != null) {
+					group.shutdownGracefully();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Closes the connection to the given server address if it exists.
+	 *
+	 * <p>If there is a request to the server a new connection will be established.
+	 *
+	 * @param serverAddress Target address of the connection to close
+	 */
+	public void closeConnection(KvStateServerAddress serverAddress) {
+		PendingConnection pending = pendingConnections.get(serverAddress);
+		if (pending != null) {
+			pending.close();
+		}
+
+		EstablishedConnection established = establishedConnections.remove(serverAddress);
+		if (established != null) {
+			established.close();
+		}
+	}
+
+	/**
+	 * A pending connection that is in the process of connecting.
+	 */
+	private class PendingConnection implements ChannelFutureListener {
+
+		/** Lock to guard the connect call, channel hand in, etc. */
+		private final Object connectLock = new Object();
+
+		/** Address of the server we are connecting to. */
+		private final KvStateServerAddress serverAddress;
+
+		/** Queue of requests while connecting. */
+		private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>();
+
+		/** The established connection after the connect succeeds. */
+		private EstablishedConnection established;
+
+		/** Closed flag. */
+		private boolean closed;
+
+		/** Failure cause if something goes wrong. */
+		private Throwable failureCause;
+
+		/**
+		 * Creates a pending connection to the given server.
+		 *
+		 * @param serverAddress Address of the server to connect to.
+		 */
+		private PendingConnection(KvStateServerAddress serverAddress) {
+			this.serverAddress = serverAddress;
+		}
+
+		@Override
+		public void operationComplete(ChannelFuture future) throws Exception {
+			// Callback from the Bootstrap's connect call.
+			if (future.isSuccess()) {
+				handInChannel(future.channel());
+			} else {
+				close(future.cause());
+			}
+		}
+
+		/**
+		 * Returns a future holding the serialized request result.
+		 *
+		 * <p>If the channel has been established, forward the call to the
+		 * established channel, otherwise queue it for when the channel is
+		 * handed in.
+		 *
+		 * @param kvStateId                 ID of the KvState instance to query
+		 * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance
+		 *                                  with
+		 * @return Future holding the serialized result
+		 */
+		public Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
+			synchronized (connectLock) {
+				if (failureCause != null) {
+					return Futures.failed(failureCause);
+				} else if (closed) {
+					return Futures.failed(new ClosedChannelException());
+				} else {
+					if (established != null) {
+						return established.getKvState(kvStateId, serializedKeyAndNamespace);
+					} else {
+						// Queue this and handle when connected
+						PendingRequest pending = new PendingRequest(kvStateId, serializedKeyAndNamespace);
+						queuedRequests.add(pending);
+						return pending.promise.future();
+					}
+				}
+			}
+		}
+
+		/**
+		 * Hands in a channel after a successful connection.
+		 *
+		 * @param channel Channel to hand in
+		 */
+		private void handInChannel(Channel channel) {
+			synchronized (connectLock) {
+				if (closed || failureCause != null) {
+					// Close the channel and we are done. Any queued requests
+					// are removed on the close/failure call and after that no
+					// new ones can be enqueued.
+					channel.close();
+				} else {
+					established = new EstablishedConnection(serverAddress, channel);
+
+					PendingRequest pending;
+					while ((pending = queuedRequests.poll()) != null) {
+						Future<byte[]> resultFuture = established.getKvState(
+								pending.kvStateId,
+								pending.serializedKeyAndNamespace);
+
+						pending.promise.completeWith(resultFuture);
+					}
+
+					// Publish the channel for the general public
+					establishedConnections.put(serverAddress, established);
+					pendingConnections.remove(serverAddress);
+
+					// Check shut down for possible race with shut down. We
+					// don't want any lingering connections after shut down,
+					// which can happen if we don't check this here.
+					if (shutDown.get()) {
+						if (establishedConnections.remove(serverAddress, established)) {
+							established.close();
+						}
+					}
+				}
+			}
+		}
+
+		/**
+		 * Close the connecting channel with a ClosedChannelException.
+		 */
+		private void close() {
+			close(new ClosedChannelException());
+		}
+
+		/**
+		 * Close the connecting channel with an Exception (can be
+		 * <code>null</code>) or forward to the established channel.
+		 */
+		private void close(Throwable cause) {
+			synchronized (connectLock) {
+				if (!closed) {
+					if (failureCause == null) {
+						failureCause = cause;
+					}
+
+					if (established != null) {
+						established.close();
+					} else {
+						PendingRequest pending;
+						while ((pending = queuedRequests.poll()) != null) {
+							pending.promise.tryFailure(cause);
+						}
+					}
+
+					closed = true;
+				}
+			}
+		}
+
+		/**
+		 * A pending request queued while the channel is connecting.
+		 */
+		private final class PendingRequest {
+
+			private final KvStateID kvStateId;
+			private final byte[] serializedKeyAndNamespace;
+			private final Promise<byte[]> promise;
+
+			private PendingRequest(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
+				this.kvStateId = kvStateId;
+				this.serializedKeyAndNamespace = serializedKeyAndNamespace;
+				this.promise = Futures.promise();
+			}
+		}
+
+		@Override
+		public String toString() {
+			synchronized (connectLock) {
+				return "PendingConnection{" +
+						"serverAddress=" + serverAddress +
+						", queuedRequests=" + queuedRequests.size() +
+						", established=" + (established != null) +
+						", closed=" + closed +
+						'}';
+			}
+		}
+	}
+
+	/**
+	 * An established connection that wraps the actual channel instance and is
+	 * registered at the {@link KvStateClientHandler} for callbacks.
+	 */
+	private class EstablishedConnection implements KvStateClientHandlerCallback {
+
+		/** Address of the server we are connected to. */
+		private final KvStateServerAddress serverAddress;
+
+		/** The actual TCP channel. */
+		private final Channel channel;
+
+		/** Pending requests keyed by request ID. */
+		private final ConcurrentHashMap<Long, PromiseAndTimestamp> pendingRequests = new ConcurrentHashMap<>();
+
+		/** Current request number used to assign unique request IDs. */
+		private final AtomicLong requestCount = new AtomicLong();
+
+		/** Reference to a failure that was reported by the channel. */
+		private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
+
+		/**
+		 * Creates an established connection with the given channel.
+		 *
+		 * @param serverAddress Address of the server connected to
+		 * @param channel The actual TCP channel
+		 */
+		EstablishedConnection(KvStateServerAddress serverAddress, Channel channel) {
+			this.serverAddress = Preconditions.checkNotNull(serverAddress, "KvStateServerAddress");
+			this.channel = Preconditions.checkNotNull(channel, "Channel");
+
+			// Add the client handler with the callback
+			channel.pipeline().addLast("KvStateClientHandler", new KvStateClientHandler(this));
+
+			stats.reportActiveConnection();
+		}
+
+		/**
+		 * Close the channel with a ClosedChannelException.
+		 */
+		void close() {
+			close(new ClosedChannelException());
+		}
+
+		/**
+		 * Close the channel with a cause.
+		 *
+		 * @param cause The cause to close the channel with.
+		 * @return Channel close future
+		 */
+		private boolean close(Throwable cause) {
+			if (failureCause.compareAndSet(null, cause)) {
+				channel.close();
+				stats.reportInactiveConnection();
+
+				for (long requestId : pendingRequests.keySet()) {
+					PromiseAndTimestamp pending = pendingRequests.remove(requestId);
+					if (pending != null && pending.promise.tryFailure(cause)) {
+						stats.reportFailedRequest();
+					}
+				}
+
+				return true;
+			}
+
+			return false;
+		}
+
+		/**
+		 * Returns a future holding the serialized request result.
+		 *
+		 * @param kvStateId                 ID of the KvState instance to query
+		 * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance
+		 *                                  with
+		 * @return Future holding the serialized result
+		 */
+		Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
+			PromiseAndTimestamp requestPromiseTs = new PromiseAndTimestamp(
+					Futures.<byte[]>promise(),
+					System.nanoTime());
+
+			try {
+				final long requestId = requestCount.getAndIncrement();
+				pendingRequests.put(requestId, requestPromiseTs);
+
+				stats.reportRequest();
+
+				ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest(
+						channel.alloc(),
+						requestId,
+						kvStateId,
+						serializedKeyAndNamespace);
+
+				channel.writeAndFlush(buf).addListener(new ChannelFutureListener() {
+					@Override
+					public void operationComplete(ChannelFuture future) throws Exception {
+						if (!future.isSuccess()) {
+							// Fail promise if not failed to write
+							PromiseAndTimestamp pending = pendingRequests.remove(requestId);
+							if (pending != null && pending.promise.tryFailure(future.cause())) {
+								stats.reportFailedRequest();
+							}
+						}
+					}
+				});
+
+				// Check failure for possible race. We don't want any lingering
+				// promises after a failure, which can happen if we don't check
+				// this here. Note that close is treated as a failure as well.
+				Throwable failure = failureCause.get();
+				if (failure != null) {
+					// Remove from pending requests to guard against concurrent
+					// removal and to make sure that we only count it once as failed.
+					PromiseAndTimestamp p = pendingRequests.remove(requestId);
+					if (p != null && p.promise.tryFailure(failure)) {
+						stats.reportFailedRequest();
+					}
+				}
+			} catch (Throwable t) {
+				requestPromiseTs.promise.tryFailure(t);
+			}
+
+			return requestPromiseTs.promise.future();
+		}
+
+		@Override
+		public void onRequestResult(long requestId, byte[] serializedValue) {
+			PromiseAndTimestamp pending = pendingRequests.remove(requestId);
+			if (pending != null && pending.promise.trySuccess(serializedValue)) {
+				long durationMillis = (System.nanoTime() - pending.timestamp) / 1_000_000;
+				stats.reportSuccessfulRequest(durationMillis);
+			}
+		}
+
+		@Override
+		public void onRequestFailure(long requestId, Throwable cause) {
+			PromiseAndTimestamp pending = pendingRequests.remove(requestId);
+			if (pending != null && pending.promise.tryFailure(cause)) {
+				stats.reportFailedRequest();
+			}
+		}
+
+		@Override
+		public void onFailure(Throwable cause) {
+			if (close(cause)) {
+				// Remove from established channels, otherwise future
+				// requests will be handled by this failed channel.
+				establishedConnections.remove(serverAddress, this);
+			}
+		}
+
+		@Override
+		public String toString() {
+			return "EstablishedConnection{" +
+					"serverAddress=" + serverAddress +
+					", channel=" + channel +
+					", pendingRequests=" + pendingRequests.size() +
+					", requestCount=" + requestCount +
+					", failureCause=" + failureCause +
+					'}';
+		}
+
+		/**
+		 * Pair of promise and a timestamp.
+		 */
+		private class PromiseAndTimestamp {
+
+			private final Promise<byte[]> promise;
+			private final long timestamp;
+
+			public PromiseAndTimestamp(Promise<byte[]> promise, long timestamp) {
+				this.promise = promise;
+				this.timestamp = timestamp;
+			}
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
new file mode 100644
index 0000000..2166bf2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * This handler expects responses from {@link KvStateServerHandler}.
+ *
+ * <p>It deserializes the response and calls the registered callback, which is
+ * responsible for actually handling the result (see {@link KvStateClient.EstablishedConnection}).
+ */
+class KvStateClientHandler extends ChannelInboundHandlerAdapter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateClientHandler.class);
+
+	private final KvStateClientHandlerCallback callback;
+
+	/**
+	 * Creates a {@link KvStateClientHandler} with the callback.
+	 *
+	 * @param callback Callback for responses.
+	 */
+	KvStateClientHandler(KvStateClientHandlerCallback callback) {
+		this.callback = callback;
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		try {
+			ByteBuf buf = (ByteBuf) msg;
+			KvStateRequestType msgType = KvStateRequestSerializer.deserializeHeader(buf);
+
+			if (msgType == KvStateRequestType.REQUEST_RESULT) {
+				KvStateRequestResult result = KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
+				callback.onRequestResult(result.getRequestId(), result.getSerializedResult());
+			} else if (msgType == KvStateRequestType.REQUEST_FAILURE) {
+				KvStateRequestFailure failure = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
+				callback.onRequestFailure(failure.getRequestId(), failure.getCause());
+			} else if (msgType == KvStateRequestType.SERVER_FAILURE) {
+				throw KvStateRequestSerializer.deserializeServerFailure(buf);
+			} else {
+				throw new IllegalStateException("Unexpected response type '" + msgType + "'");
+			}
+		} catch (Throwable t1) {
+			try {
+				callback.onFailure(t1);
+			} catch (Throwable t2) {
+				LOG.error("Failed to notify callback about failure", t2);
+			}
+		} finally {
+			ReferenceCountUtil.release(msg);
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		try {
+			callback.onFailure(cause);
+		} catch (Throwable t) {
+			LOG.error("Failed to notify callback about failure", t);
+		}
+	}
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		// Only the client is expected to close the channel. Otherwise it
+		// indicates a failure. Note that this will be invoked in both cases
+		// though. If the callback closed the channel, the callback must be
+		// ignored.
+		try {
+			callback.onFailure(new ClosedChannelException());
+		} catch (Throwable t) {
+			LOG.error("Failed to notify callback about failure", t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
new file mode 100644
index 0000000..65ff781
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import org.apache.flink.runtime.query.netty.message.KvStateRequest;
+
+/**
+ * Callback for {@link KvStateClientHandler}.
+ */
+interface KvStateClientHandlerCallback {
+
+	/**
+	 * Called on a successful {@link KvStateRequest}.
+	 *
+	 * @param requestId       ID of the request
+	 * @param serializedValue Serialized value for the request
+	 */
+	void onRequestResult(long requestId, byte[] serializedValue);
+
+	/**
+	 * Called on a failed {@link KvStateRequest}.
+	 *
+	 * @param requestId ID of the request
+	 * @param cause     Cause of the request failure
+	 */
+	void onRequestFailure(long requestId, Throwable cause);
+
+	/**
+	 * Called on any failure, which is not related to a specific request.
+	 *
+	 * <p>This can be for example a caught Exception in the channel pipeline
+	 * or an unexpected channel close.
+	 *
+	 * @param cause Cause of the failure
+	 */
+	void onFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
new file mode 100644
index 0000000..1c0d8d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+/**
+ * Simple statistics for {@link KvStateServer} monitoring.
+ */
+public interface KvStateRequestStats {
+
+	/**
+	 * Reports an active connection.
+	 */
+	void reportActiveConnection();
+
+	/**
+	 * Reports an inactive connection.
+	 */
+	void reportInactiveConnection();
+
+	/**
+	 * Reports an incoming request.
+	 */
+	void reportRequest();
+
+	/**
+	 * Reports a successfully handled request.
+	 *
+	 * @param durationTotalMillis Duration of the request (in milliseconds).
+	 */
+	void reportSuccessfulRequest(long durationTotalMillis);
+
+	/**
+	 * Reports a failure during a request.
+	 */
+	void reportFailedRequest();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
new file mode 100644
index 0000000..0c0c5ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.message.KvStateRequest;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Netty-based server answering {@link KvStateRequest} messages.
+ *
+ * <p>Requests are handled by asynchronous query tasks (see {@link KvStateServerHandler.AsyncKvStateQueryTask})
+ * that are executed by a separate query Thread pool. This pool is shared among
+ * all TCP connections.
+ *
+ * <p>The incoming pipeline looks as follows:
+ * <pre>
+ * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
+ * </pre>
+ *
+ * <p>Received binary messages are expected to contain a frame length field. Netty's
+ * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before
+ * giving it to our {@link KvStateServerHandler}.
+ *
+ * <p>Connections are established and closed by the client. The server only
+ * closes the connection on a fatal failure that cannot be recovered. A
+ * server-side connection close is considered a failure by the client.
+ */
+public class KvStateServer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateServer.class);
+
+	/** Server config: low water mark */
+	private static final int LOW_WATER_MARK = 8 * 1024;
+
+	/** Server config: high water mark */
+	private static final int HIGH_WATER_MARK = 32 * 1024;
+
+	/** Netty's ServerBootstrap. */
+	private final ServerBootstrap bootstrap;
+
+	/** Query executor thread pool. */
+	private final ExecutorService queryExecutor;
+
+	/** Address of this server. */
+	private KvStateServerAddress serverAddress;
+
+	/**
+	 * Creates the {@link KvStateServer}.
+	 *
+	 * <p>The server needs to be started via {@link #start()} in order to bind
+	 * to the configured bind address.
+	 *
+	 * @param bindAddress         Address to bind to
+	 * @param bindPort            Port to bind to. Pick random port if 0.
+	 * @param numEventLoopThreads Number of event loop threads
+	 * @param numQueryThreads     Number of query threads
+	 * @param kvStateRegistry     KvStateRegistry to query for KvState instances
+	 * @param stats               Statistics tracker
+	 */
+	public KvStateServer(
+			InetAddress bindAddress,
+			int bindPort,
+			int numEventLoopThreads,
+			int numQueryThreads,
+			KvStateRegistry kvStateRegistry,
+			KvStateRequestStats stats) {
+
+		Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort +
+				" is out of valid port range (0-65536).");
+
+		Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
+		Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
+
+		Preconditions.checkNotNull(kvStateRegistry, "KvStateRegistry");
+		Preconditions.checkNotNull(stats, "KvStateRequestStats");
+
+		NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
+
+		ThreadFactory threadFactory = new ThreadFactoryBuilder()
+				.setDaemon(true)
+				.setNameFormat("Flink KvStateServer EventLoop Thread %d")
+				.build();
+
+		NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
+
+		queryExecutor = createQueryExecutor(numQueryThreads);
+
+		// Shared between all channels
+		KvStateServerHandler serverHandler = new KvStateServerHandler(
+				kvStateRegistry,
+				queryExecutor,
+				stats);
+
+		bootstrap = new ServerBootstrap()
+				// Bind address and port
+				.localAddress(bindAddress, bindPort)
+				// NIO server channels
+				.group(nioGroup)
+				.channel(NioServerSocketChannel.class)
+				// Server channel Options
+				.option(ChannelOption.ALLOCATOR, bufferPool)
+				// Child channel options
+				.childOption(ChannelOption.ALLOCATOR, bufferPool)
+				.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
+				.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
+				// See initializer for pipeline details
+				.childHandler(new KvStateServerChannelInitializer(serverHandler));
+	}
+
+	/**
+	 * Starts the server by binding to the configured bind address (blocking).
+	 *
+	 * @throws InterruptedException If interrupted during the bind operation
+	 */
+	public void start() throws InterruptedException {
+		Channel channel = bootstrap.bind().sync().channel();
+
+		InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
+		serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+	}
+
+	/**
+	 * Returns the address of this server.
+	 *
+	 * @return Server address
+	 * @throws IllegalStateException If server has not been started yet
+	 */
+	public KvStateServerAddress getAddress() {
+		if (serverAddress == null) {
+			throw new IllegalStateException("KvStateServer not started yet.");
+		}
+
+		return serverAddress;
+	}
+
+	/**
+	 * Shuts down the server and all related thread pools.
+	 */
+	public void shutDown() {
+		if (bootstrap != null) {
+			EventLoopGroup group = bootstrap.group();
+			if (group != null) {
+				Future<?> shutDownFuture = group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
+				try {
+					shutDownFuture.await();
+				} catch (InterruptedException e) {
+					LOG.error("Interrupted during shut down", e);
+				}
+			}
+		}
+
+		if (queryExecutor != null) {
+			queryExecutor.shutdown();
+		}
+
+		serverAddress = null;
+	}
+
+	/**
+	 * Creates a thread pool for the query execution.
+	 *
+	 * @param numQueryThreads Number of query threads.
+	 * @return Thread pool for query execution
+	 */
+	private static ExecutorService createQueryExecutor(int numQueryThreads) {
+		ThreadFactory threadFactory = new ThreadFactoryBuilder()
+				.setDaemon(true)
+				.setNameFormat("Flink KvStateServer Query Thread %d")
+				.build();
+
+		return Executors.newFixedThreadPool(numQueryThreads, threadFactory);
+	}
+
+	/**
+	 * Channel pipeline initializer.
+	 *
+	 * <p>The request handler is shared, whereas the other handlers are created
+	 * per channel.
+	 */
+	private static final class KvStateServerChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+		/** The shared request handler. */
+		private final KvStateServerHandler sharedRequestHandler;
+
+		/**
+		 * Creates the channel pipeline initializer with the shared request handler.
+		 *
+		 * @param sharedRequestHandler Shared request handler.
+		 */
+		public KvStateServerChannelInitializer(KvStateServerHandler sharedRequestHandler) {
+			this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "Request handler");
+		}
+
+		@Override
+		protected void initChannel(SocketChannel ch) throws Exception {
+			ch.pipeline()
+					.addLast(new ChunkedWriteHandler())
+					.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+					.addLast(sharedRequestHandler);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
new file mode 100644
index 0000000..47f2ad6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.message.KvStateRequest;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * This handler dispatches asynchronous tasks, which query {@link KvState}
+ * instances and write the result to the channel.
+ *
+ * <p>The network threads receive the message, deserialize it and dispatch the
+ * query task. The actual query is handled in a separate thread as it might
+ * otherwise block the network threads (file I/O etc.).
+ */
+@ChannelHandler.Sharable
+class KvStateServerHandler extends ChannelInboundHandlerAdapter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
+
+	/** KvState registry holding references to the KvState instances. */
+	private final KvStateRegistry registry;
+
+	/** Thread pool for query execution. */
+	private final ExecutorService queryExecutor;
+
+	/** Exposed server statistics. */
+	private final KvStateRequestStats stats;
+
+	/**
+	 * Create the handler.
+	 *
+	 * @param kvStateRegistry Registry to query.
+	 * @param queryExecutor   Thread pool for query execution.
+	 * @param stats           Exposed server statistics.
+	 */
+	KvStateServerHandler(
+			KvStateRegistry kvStateRegistry,
+			ExecutorService queryExecutor,
+			KvStateRequestStats stats) {
+
+		this.registry = Objects.requireNonNull(kvStateRegistry, "KvStateRegistry");
+		this.queryExecutor = Objects.requireNonNull(queryExecutor, "Query thread pool");
+		this.stats = Objects.requireNonNull(stats, "KvStateRequestStats");
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		stats.reportActiveConnection();
+	}
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		stats.reportInactiveConnection();
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		KvStateRequest request = null;
+
+		try {
+			ByteBuf buf = (ByteBuf) msg;
+			KvStateRequestType msgType = KvStateRequestSerializer.deserializeHeader(buf);
+
+			if (msgType == KvStateRequestType.REQUEST) {
+				// ------------------------------------------------------------
+				// Request
+				// ------------------------------------------------------------
+				request = KvStateRequestSerializer.deserializeKvStateRequest(buf);
+
+				stats.reportRequest();
+
+				KvState<?, ?, ?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
+
+				if (kvState != null) {
+					// Execute actual query async, because it is possibly
+					// blocking (e.g. file I/O).
+					//
+					// A submission failure is not treated as fatal.
+					queryExecutor.submit(new AsyncKvStateQueryTask(ctx, request, kvState, stats));
+				} else {
+					ByteBuf unknown = KvStateRequestSerializer.serializeKvStateRequestFailure(
+							ctx.alloc(),
+							request.getRequestId(),
+							new UnknownKvStateID(request.getKvStateId()));
+
+					ctx.writeAndFlush(unknown);
+
+					stats.reportFailedRequest();
+				}
+			} else {
+				// ------------------------------------------------------------
+				// Unexpected
+				// ------------------------------------------------------------
+				ByteBuf failure = KvStateRequestSerializer.serializeServerFailure(
+						ctx.alloc(),
+						new IllegalArgumentException("Unexpected message type " + msgType
+								+ ". KvStateServerHandler expects "
+								+ KvStateRequestType.REQUEST + " messages."));
+
+				ctx.writeAndFlush(failure);
+			}
+		} catch (Throwable t) {
+			String stringifiedCause = ExceptionUtils.stringifyException(t);
+
+			ByteBuf err;
+			if (request != null) {
+				String errMsg = "Failed to handle incoming request with ID " +
+						request.getRequestId() + ". Caused by: " + stringifiedCause;
+				err = KvStateRequestSerializer.serializeKvStateRequestFailure(
+						ctx.alloc(),
+						request.getRequestId(),
+						new RuntimeException(errMsg));
+
+				stats.reportFailedRequest();
+			} else {
+				String errMsg = "Failed to handle incoming message. Caused by: " + stringifiedCause;
+				err = KvStateRequestSerializer.serializeServerFailure(
+						ctx.alloc(),
+						new RuntimeException(errMsg));
+			}
+
+			ctx.writeAndFlush(err);
+		} finally {
+			// IMPORTANT: We have to always recycle the incoming buffer.
+			// Otherwise we will leak memory out of Netty's buffer pool.
+			//
+			// If any operation ever holds on to the buffer, it is the
+			// responsibility of that operation to retain the buffer and
+			// release it later.
+			ReferenceCountUtil.release(msg);
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		String stringifiedCause = ExceptionUtils.stringifyException(cause);
+		String msg = "Exception in server pipeline. Caused by: " + stringifiedCause;
+
+		ByteBuf err = KvStateRequestSerializer.serializeServerFailure(
+				ctx.alloc(),
+				new RuntimeException(msg));
+
+		ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
+	}
+
+	/**
+	 * Task to execute the actual query against the {@link KvState} instance.
+	 */
+	private static class AsyncKvStateQueryTask implements Runnable {
+
+		private final ChannelHandlerContext ctx;
+
+		private final KvStateRequest request;
+
+		private final KvState<?, ?, ?, ?, ?> kvState;
+
+		private final KvStateRequestStats stats;
+
+		private final long creationNanos;
+
+		public AsyncKvStateQueryTask(
+				ChannelHandlerContext ctx,
+				KvStateRequest request,
+				KvState<?, ?, ?, ?, ?> kvState,
+				KvStateRequestStats stats) {
+
+			this.ctx = Objects.requireNonNull(ctx, "Channel handler context");
+			this.request = Objects.requireNonNull(request, "State query");
+			this.kvState = Objects.requireNonNull(kvState, "KvState");
+			this.stats = Objects.requireNonNull(stats, "State query stats");
+			this.creationNanos = System.nanoTime();
+		}
+
+		@Override
+		public void run() {
+			boolean success = false;
+
+			try {
+				if (!ctx.channel().isActive()) {
+					return;
+				}
+
+				// Query the KvState instance
+				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
+				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
+
+				if (serializedResult != null) {
+					// We found some data, success!
+					ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult(
+							ctx.alloc(),
+							request.getRequestId(),
+							serializedResult);
+
+					int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark();
+
+					ChannelFuture write;
+					if (buf.readableBytes() <= highWatermark) {
+						write = ctx.writeAndFlush(buf);
+					} else {
+						write = ctx.writeAndFlush(new ChunkedByteBuf(buf, highWatermark));
+					}
+
+					write.addListener(new QueryResultWriteListener());
+
+					success = true;
+				} else {
+					// No data for the key/namespace. This is considered to be
+					// a failure.
+					ByteBuf unknownKey = KvStateRequestSerializer.serializeKvStateRequestFailure(
+							ctx.alloc(),
+							request.getRequestId(),
+							new UnknownKeyOrNamespace());
+
+					ctx.writeAndFlush(unknownKey);
+				}
+			} catch (Throwable t) {
+				try {
+					String stringifiedCause = ExceptionUtils.stringifyException(t);
+					String errMsg = "Failed to query state backend for query " +
+							request.getRequestId() + ". Caused by: " + stringifiedCause;
+
+					ByteBuf err = KvStateRequestSerializer.serializeKvStateRequestFailure(
+							ctx.alloc(), request.getRequestId(), new RuntimeException(errMsg));
+
+					ctx.writeAndFlush(err);
+				} catch (IOException e) {
+					LOG.error("Failed to respond with the error after failed to query state backend", e);
+				}
+			} finally {
+				if (!success) {
+					stats.reportFailedRequest();
+				}
+			}
+		}
+
+		@Override
+		public String toString() {
+			return "AsyncKvStateQueryTask{" +
+					", request=" + request +
+					", creationNanos=" + creationNanos +
+					'}';
+		}
+
+		/**
+		 * Callback after query result has been written.
+		 *
+		 * <p>Gathers stats and logs errors.
+		 */
+		private class QueryResultWriteListener implements ChannelFutureListener {
+
+			@Override
+			public void operationComplete(ChannelFuture future) throws Exception {
+				long durationMillis = (System.nanoTime() - creationNanos) / 1_000_000;
+
+				if (future.isSuccess()) {
+					stats.reportSuccessfulRequest(durationMillis);
+				} else {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Query " + request + " failed after " + durationMillis + " ms", future.cause());
+					}
+
+					stats.reportFailedRequest();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
new file mode 100644
index 0000000..4e5a1de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+/**
+ * Thrown if the KvState does not hold any state for the given key or namespace.
+ */
+public class UnknownKeyOrNamespace extends IllegalStateException {
+
+	private static final long serialVersionUID = 1L;
+
+	UnknownKeyOrNamespace() {
+		super("KvState does not hold any state for key/namespace.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
new file mode 100644
index 0000000..cc60035
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Thrown if no KvState with the given ID cannot found by the server handler.
+ */
+public class UnknownKvStateID extends IllegalStateException {
+
+	private static final long serialVersionUID = 1L;
+
+	public UnknownKvStateID(KvStateID kvStateId) {
+		super("No KvState registered with ID " + Preconditions.checkNotNull(kvStateId, "KvStateID") +
+				" at TaskManager.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
new file mode 100644
index 0000000..0abb653
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link KvState} instance request for a specific key and namespace.
+ */
+public final class KvStateRequest {
+
+	/** ID for this request. */
+	private final long requestId;
+
+	/** ID of the requested KvState instance. */
+	private final KvStateID kvStateId;
+
+	/** Serialized key and namespace to request from the KvState instance. */
+	private final byte[] serializedKeyAndNamespace;
+
+	/**
+	 * Creates a KvState instance request.
+	 *
+	 * @param requestId                 ID for this request
+	 * @param kvStateId                 ID of the requested KvState instance
+	 * @param serializedKeyAndNamespace Serialized key and namespace to request from the KvState
+	 *                                  instance
+	 */
+	KvStateRequest(long requestId, KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
+		this.requestId = requestId;
+		this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
+		this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+	}
+
+	/**
+	 * Returns the request ID.
+	 *
+	 * @return Request ID
+	 */
+	public long getRequestId() {
+		return requestId;
+	}
+
+	/**
+	 * Returns the ID of the requested KvState instance.
+	 *
+	 * @return ID of the requested KvState instance
+	 */
+	public KvStateID getKvStateId() {
+		return kvStateId;
+	}
+
+	/**
+	 * Returns the serialized key and namespace to request from the KvState
+	 * instance.
+	 *
+	 * @return Serialized key and namespace to request from the KvState instance
+	 */
+	public byte[] getSerializedKeyAndNamespace() {
+		return serializedKeyAndNamespace;
+	}
+
+	@Override
+	public String toString() {
+		return "KvStateRequest{" +
+				"requestId=" + requestId +
+				", kvStateId=" + kvStateId +
+				", serializedKeyAndNamespace.length=" + serializedKeyAndNamespace.length +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
new file mode 100644
index 0000000..06a3ce8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+/**
+ * A failure response to a {@link KvStateRequest}.
+ */
+public final class KvStateRequestFailure {
+
+	/** ID of the request responding to. */
+	private final long requestId;
+
+	/** Failure cause. Not allowed to be a user type. */
+	private final Throwable cause;
+
+	/**
+	 * Creates a failure response to a {@link KvStateRequest}.
+	 *
+	 * @param requestId ID for the request responding to
+	 * @param cause     Failure cause (not allowed to be a user type)
+	 */
+	KvStateRequestFailure(long requestId, Throwable cause) {
+		this.requestId = requestId;
+		this.cause = cause;
+	}
+
+	/**
+	 * Returns the request ID responding to.
+	 *
+	 * @return Request ID responding to
+	 */
+	public long getRequestId() {
+		return requestId;
+	}
+
+	/**
+	 * Returns the failure cause.
+	 *
+	 * @return Failure cause
+	 */
+	public Throwable getCause() {
+		return cause;
+	}
+
+	@Override
+	public String toString() {
+		return "KvStateRequestFailure{" +
+				"requestId=" + requestId +
+				", cause=" + cause +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
new file mode 100644
index 0000000..2bd8a36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A successful response to a {@link KvStateRequest} containing the serialized
+ * result for the requested key and namespace.
+ */
+public final class KvStateRequestResult {
+
+	/** ID of the request responding to. */
+	private final long requestId;
+
+	/**
+	 * Serialized result for the requested key and namespace. If no result was
+	 * available for the specified key and namespace, this is <code>null</code>.
+	 */
+	private final byte[] serializedResult;
+
+	/**
+	 * Creates a successful {@link KvStateRequestResult} response.
+	 *
+	 * @param requestId        ID of the request responding to
+	 * @param serializedResult Serialized result or <code>null</code> if none
+	 */
+	KvStateRequestResult(long requestId, byte[] serializedResult) {
+		this.requestId = requestId;
+		this.serializedResult = Preconditions.checkNotNull(serializedResult, "Serialization result");
+	}
+
+	/**
+	 * Returns the request ID responding to.
+	 *
+	 * @return Request ID responding to
+	 */
+	public long getRequestId() {
+		return requestId;
+	}
+
+	/**
+	 * Returns the serialized result or <code>null</code> if none available.
+	 *
+	 * @return Serialized result or <code>null</code> if none available.
+	 */
+	public byte[] getSerializedResult() {
+		return serializedResult;
+	}
+
+	@Override
+	public String toString() {
+		return "KvStateRequestResult{" +
+				"requestId=" + requestId +
+				", serializedResult.length=" + serializedResult.length +
+				'}';
+	}
+}