You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:12 UTC

[12/14] flink git commit: [FLINK-7770][QS] Hide the queryable state behind a proxy.

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
new file mode 100644
index 0000000..e6d59de
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The base class for every client in the queryable state module.
+ * It is using pure netty to send and receive messages of type {@link MessageBody}.
+ *
+ * @param <REQ> the type of request the client will send.
+ * @param <RESP> the type of response the client expects to receive.
+ */
+@Internal
+public class Client<REQ extends MessageBody, RESP extends MessageBody> {
+
+	/** The name of the client. Used for logging and stack traces.*/
+	private final String clientName;
+
+	/** Netty's Bootstrap. */
+	private final Bootstrap bootstrap;
+
+	/** The serializer to be used for (de-)serializing messages. */
+	private final MessageSerializer<REQ, RESP> messageSerializer;
+
+	/** Statistics tracker. */
+	private final KvStateRequestStats stats;
+
+	/** Established connections. */
+	private final Map<KvStateServerAddress, EstablishedConnection> establishedConnections = new ConcurrentHashMap<>();
+
+	/** Pending connections. */
+	private final Map<KvStateServerAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>();
+
+	/** Atomic shut down flag. */
+	private final AtomicBoolean shutDown = new AtomicBoolean();
+
+	/**
+	 * Creates a client with the specified number of event loop threads.
+	 *
+	 * @param clientName the name of the client.
+	 * @param numEventLoopThreads number of event loop threads (minimum 1).
+	 * @param serializer the serializer used to (de-)serialize messages.
+	 * @param stats the statistics collector.
+	 */
+	public Client(
+			final String clientName,
+			final int numEventLoopThreads,
+			final MessageSerializer<REQ, RESP> serializer,
+			final KvStateRequestStats stats) {
+
+		Preconditions.checkArgument(numEventLoopThreads >= 1,
+				"Non-positive number of event loop threads.");
+
+		this.clientName = Preconditions.checkNotNull(clientName);
+		this.messageSerializer = Preconditions.checkNotNull(serializer);
+		this.stats = Preconditions.checkNotNull(stats);
+
+		final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+				.setDaemon(true)
+				.setNameFormat("Flink " + clientName + " Event Loop Thread %d")
+				.build();
+
+		final EventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
+		final ByteBufAllocator bufferPool = new NettyBufferPool(numEventLoopThreads);
+
+		this.bootstrap = new Bootstrap()
+				.group(nioGroup)
+				.channel(NioSocketChannel.class)
+				.option(ChannelOption.ALLOCATOR, bufferPool)
+				.handler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					protected void initChannel(SocketChannel channel) throws Exception {
+						channel.pipeline()
+								.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+								.addLast(new ChunkedWriteHandler());
+					}
+				});
+	}
+
+	public String getClientName() {
+		return clientName;
+	}
+
+	public CompletableFuture<RESP> sendRequest(final KvStateServerAddress serverAddress, final REQ request) {
+		if (shutDown.get()) {
+			return FutureUtils.getFailedFuture(new IllegalStateException("Shut down"));
+		}
+
+		EstablishedConnection connection = establishedConnections.get(serverAddress);
+		if (connection != null) {
+			return connection.sendRequest(request);
+		} else {
+			PendingConnection pendingConnection = pendingConnections.get(serverAddress);
+			if (pendingConnection != null) {
+				// There was a race, use the existing pending connection.
+				return pendingConnection.sendRequest(request);
+			} else {
+				// We try to connect to the server.
+				PendingConnection pending = new PendingConnection(serverAddress, messageSerializer);
+				PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending);
+
+				if (previous == null) {
+					// OK, we are responsible to connect.
+					bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()).addListener(pending);
+					return pending.sendRequest(request);
+				} else {
+					// There was a race, use the existing pending connection.
+					return previous.sendRequest(request);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Shuts down the client and closes all connections.
+	 *
+	 * <p>After a call to this method, all returned futures will be failed.
+	 */
+	public void shutdown() {
+		if (shutDown.compareAndSet(false, true)) {
+			for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
+				if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
+					conn.getValue().close();
+				}
+			}
+
+			for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) {
+				if (pendingConnections.remove(conn.getKey()) != null) {
+					conn.getValue().close();
+				}
+			}
+
+			if (bootstrap != null) {
+				EventLoopGroup group = bootstrap.group();
+				if (group != null) {
+					group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
+				}
+			}
+		}
+	}
+
+	/**
+	 * A pending connection that is in the process of connecting.
+	 */
+	private class PendingConnection implements ChannelFutureListener {
+
+		/** Lock to guard the connect call, channel hand in, etc. */
+		private final Object connectLock = new Object();
+
+		/** Address of the server we are connecting to. */
+		private final KvStateServerAddress serverAddress;
+
+		private final MessageSerializer<REQ, RESP> serializer;
+
+		/** Queue of requests while connecting. */
+		private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>();
+
+		/** The established connection after the connect succeeds. */
+		private EstablishedConnection established;
+
+		/** Closed flag. */
+		private boolean closed;
+
+		/** Failure cause if something goes wrong. */
+		private Throwable failureCause;
+
+		/**
+		 * Creates a pending connection to the given server.
+		 *
+		 * @param serverAddress Address of the server to connect to.
+		 */
+		private PendingConnection(
+				final KvStateServerAddress serverAddress,
+				final MessageSerializer<REQ, RESP> serializer) {
+			this.serverAddress = serverAddress;
+			this.serializer = serializer;
+		}
+
+		@Override
+		public void operationComplete(ChannelFuture future) throws Exception {
+			if (future.isSuccess()) {
+				handInChannel(future.channel());
+			} else {
+				close(future.cause());
+			}
+		}
+
+		/**
+		 * Returns a future holding the serialized request result.
+		 *
+		 * <p>If the channel has been established, forward the call to the
+		 * established channel, otherwise queue it for when the channel is
+		 * handed in.
+		 *
+		 * @param request the request to be sent.
+		 * @return Future holding the serialized result
+		 */
+		public CompletableFuture<RESP> sendRequest(REQ request) {
+			synchronized (connectLock) {
+				if (failureCause != null) {
+					return FutureUtils.getFailedFuture(failureCause);
+				} else if (closed) {
+					return FutureUtils.getFailedFuture(new ClosedChannelException());
+				} else {
+					if (established != null) {
+						return established.sendRequest(request);
+					} else {
+						// Queue this and handle when connected
+						final PendingRequest pending = new PendingRequest(request);
+						queuedRequests.add(pending);
+						return pending;
+					}
+				}
+			}
+		}
+
+		/**
+		 * Hands in a channel after a successful connection.
+		 *
+		 * @param channel Channel to hand in
+		 */
+		private void handInChannel(Channel channel) {
+			synchronized (connectLock) {
+				if (closed || failureCause != null) {
+					// Close the channel and we are done. Any queued requests
+					// are removed on the close/failure call and after that no
+					// new ones can be enqueued.
+					channel.close();
+				} else {
+					established = new EstablishedConnection(serverAddress, serializer, channel);
+
+					while (!queuedRequests.isEmpty()) {
+						final PendingRequest pending = queuedRequests.poll();
+
+						established.sendRequest(pending.request)
+								.thenAccept(resp -> pending.complete(resp))
+								.exceptionally(throwable -> {
+									pending.completeExceptionally(throwable);
+									return null;
+						});
+					}
+
+					// Publish the channel for the general public
+					establishedConnections.put(serverAddress, established);
+					pendingConnections.remove(serverAddress);
+
+					// Check shut down for possible race with shut down. We
+					// don't want any lingering connections after shut down,
+					// which can happen if we don't check this here.
+					if (shutDown.get()) {
+						if (establishedConnections.remove(serverAddress, established)) {
+							established.close();
+						}
+					}
+				}
+			}
+		}
+
+		/**
+		 * Close the connecting channel with a ClosedChannelException.
+		 */
+		private void close() {
+			close(new ClosedChannelException());
+		}
+
+		/**
+		 * Close the connecting channel with an Exception (can be {@code null})
+		 * or forward to the established channel.
+		 */
+		private void close(Throwable cause) {
+			synchronized (connectLock) {
+				if (!closed) {
+					if (failureCause == null) {
+						failureCause = cause;
+					}
+
+					if (established != null) {
+						established.close();
+					} else {
+						PendingRequest pending;
+						while ((pending = queuedRequests.poll()) != null) {
+							pending.completeExceptionally(cause);
+						}
+					}
+					closed = true;
+				}
+			}
+		}
+
+		@Override
+		public String toString() {
+			synchronized (connectLock) {
+				return "PendingConnection{" +
+						"serverAddress=" + serverAddress +
+						", queuedRequests=" + queuedRequests.size() +
+						", established=" + (established != null) +
+						", closed=" + closed +
+						'}';
+			}
+		}
+
+		/**
+		 * A pending request queued while the channel is connecting.
+		 */
+		private final class PendingRequest extends CompletableFuture<RESP> {
+
+			private final REQ request;
+
+			private PendingRequest(REQ request) {
+				this.request = request;
+			}
+		}
+	}
+
+	/**
+	 * An established connection that wraps the actual channel instance and is
+	 * registered at the {@link ClientHandler} for callbacks.
+	 */
+	private class EstablishedConnection implements ClientHandlerCallback<RESP> {
+
+		/** Address of the server we are connected to. */
+		private final KvStateServerAddress serverAddress;
+
+		/** The actual TCP channel. */
+		private final Channel channel;
+
+		/** Pending requests keyed by request ID. */
+		private final ConcurrentHashMap<Long, TimestampedCompletableFuture> pendingRequests = new ConcurrentHashMap<>();
+
+		/** Current request number used to assign unique request IDs. */
+		private final AtomicLong requestCount = new AtomicLong();
+
+		/** Reference to a failure that was reported by the channel. */
+		private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
+
+		/**
+		 * Creates an established connection with the given channel.
+		 *
+		 * @param serverAddress Address of the server connected to
+		 * @param channel The actual TCP channel
+		 */
+		EstablishedConnection(
+				final KvStateServerAddress serverAddress,
+				final MessageSerializer<REQ, RESP> serializer,
+				final Channel channel) {
+
+			this.serverAddress = Preconditions.checkNotNull(serverAddress);
+			this.channel = Preconditions.checkNotNull(channel);
+
+			// Add the client handler with the callback
+			channel.pipeline().addLast(
+					getClientName() + " Handler",
+					new ClientHandler<>(clientName, serializer, this)
+			);
+
+			stats.reportActiveConnection();
+		}
+
+		/**
+		 * Close the channel with a ClosedChannelException.
+		 */
+		void close() {
+			close(new ClosedChannelException());
+		}
+
+		/**
+		 * Close the channel with a cause.
+		 *
+		 * @param cause The cause to close the channel with.
+		 * @return Channel close future
+		 */
+		private boolean close(Throwable cause) {
+			if (failureCause.compareAndSet(null, cause)) {
+				channel.close();
+				stats.reportInactiveConnection();
+
+				for (long requestId : pendingRequests.keySet()) {
+					TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
+					if (pending != null && pending.completeExceptionally(cause)) {
+						stats.reportFailedRequest();
+					}
+				}
+				return true;
+			}
+			return false;
+		}
+
+		/**
+		 * Returns a future holding the serialized request result.
+		 * @param request the request to be sent.
+		 * @return Future holding the serialized result
+		 */
+		CompletableFuture<RESP> sendRequest(REQ request) {
+			TimestampedCompletableFuture requestPromiseTs =
+					new TimestampedCompletableFuture(System.nanoTime());
+			try {
+				final long requestId = requestCount.getAndIncrement();
+				pendingRequests.put(requestId, requestPromiseTs);
+
+				stats.reportRequest();
+
+				ByteBuf buf = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
+
+				channel.writeAndFlush(buf).addListener((ChannelFutureListener) future -> {
+					if (!future.isSuccess()) {
+						// Fail promise if not failed to write
+						TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
+						if (pending != null && pending.completeExceptionally(future.cause())) {
+							stats.reportFailedRequest();
+						}
+					}
+				});
+
+				// Check failure for possible race. We don't want any lingering
+				// promises after a failure, which can happen if we don't check
+				// this here. Note that close is treated as a failure as well.
+				Throwable failure = failureCause.get();
+				if (failure != null) {
+					// Remove from pending requests to guard against concurrent
+					// removal and to make sure that we only count it once as failed.
+					TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
+					if (pending != null && pending.completeExceptionally(failure)) {
+						stats.reportFailedRequest();
+					}
+				}
+			} catch (Throwable t) {
+				requestPromiseTs.completeExceptionally(t);
+			}
+
+			return requestPromiseTs;
+		}
+
+		@Override
+		public void onRequestResult(long requestId, RESP response) {
+			TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
+			if (pending != null && pending.complete(response)) {
+				long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L;
+				stats.reportSuccessfulRequest(durationMillis);
+			}
+		}
+
+		@Override
+		public void onRequestFailure(long requestId, Throwable cause) {
+			TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
+			if (pending != null && pending.completeExceptionally(cause)) {
+				stats.reportFailedRequest();
+			}
+		}
+
+		@Override
+		public void onFailure(Throwable cause) {
+			if (close(cause)) {
+				// Remove from established channels, otherwise future
+				// requests will be handled by this failed channel.
+				establishedConnections.remove(serverAddress, this);
+			}
+		}
+
+		@Override
+		public String toString() {
+			return "EstablishedConnection{" +
+					"serverAddress=" + serverAddress +
+					", channel=" + channel +
+					", pendingRequests=" + pendingRequests.size() +
+					", requestCount=" + requestCount +
+					", failureCause=" + failureCause +
+					'}';
+		}
+
+		/**
+		 * Pair of promise and a timestamp.
+		 */
+		private class TimestampedCompletableFuture extends CompletableFuture<RESP> {
+
+			private final long timestampInNanos;
+
+			TimestampedCompletableFuture(long timestampInNanos) {
+				this.timestampInNanos = timestampInNanos;
+			}
+
+			public long getTimestamp() {
+				return timestampInNanos;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
new file mode 100644
index 0000000..fc9b1d4
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * The handler used by a {@link Client} to handling incoming messages.
+ *
+ * @param <REQ> the type of request the client will send.
+ * @param <RESP> the type of response the client expects to receive.
+ */
+@Internal
+public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
+
+	private final String clientName;
+
+	private final MessageSerializer<REQ, RESP> serializer;
+
+	private final ClientHandlerCallback<RESP> callback;
+
+	/**
+	 * Creates a handler with the callback.
+	 *
+	 * @param clientName the name of the client.
+	 * @param serializer the serializer used to (de-)serialize messages.
+	 * @param callback Callback for responses.
+	 */
+	public ClientHandler(
+			final String clientName,
+			final MessageSerializer<REQ, RESP> serializer,
+			final ClientHandlerCallback<RESP> callback) {
+
+		this.clientName = Preconditions.checkNotNull(clientName);
+		this.serializer = Preconditions.checkNotNull(serializer);
+		this.callback = Preconditions.checkNotNull(callback);
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		try {
+			ByteBuf buf = (ByteBuf) msg;
+			MessageType msgType = MessageSerializer.deserializeHeader(buf);
+
+			if (msgType == MessageType.REQUEST_RESULT) {
+				long requestId = MessageSerializer.getRequestId(buf);
+				RESP result = serializer.deserializeResponse(buf);
+				callback.onRequestResult(requestId, result);
+			} else if (msgType == MessageType.REQUEST_FAILURE) {
+				RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf);
+				callback.onRequestFailure(failure.getRequestId(), failure.getCause());
+			} else if (msgType == MessageType.SERVER_FAILURE) {
+				throw MessageSerializer.deserializeServerFailure(buf);
+			} else {
+				throw new IllegalStateException("Unexpected response type '" + msgType + "'");
+			}
+		} catch (Throwable t1) {
+			try {
+				callback.onFailure(t1);
+			} catch (Throwable t2) {
+				LOG.error("Failed to notify callback about failure", t2);
+			}
+		} finally {
+			ReferenceCountUtil.release(msg);
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		try {
+			callback.onFailure(cause);
+		} catch (Throwable t) {
+			LOG.error("Failed to notify callback about failure", t);
+		}
+	}
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		// Only the client is expected to close the channel. Otherwise it
+		// indicates a failure. Note that this will be invoked in both cases
+		// though. If the callback closed the channel, the callback must be
+		// ignored.
+		try {
+			callback.onFailure(new ClosedChannelException());
+		} catch (Throwable t) {
+			LOG.error("Failed to notify callback about failure", t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
new file mode 100644
index 0000000..00ce1ed
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+
+/**
+ * Callback for {@link ClientHandler}.
+ */
+@Internal
+public interface ClientHandlerCallback<RESP extends MessageBody> {
+
+	/**
+	 * Called on a successful request.
+	 *
+	 * @param requestId			ID of the request
+	 * @param response			The received response
+	 */
+	void onRequestResult(long requestId, RESP response);
+
+	/**
+	 * Called on a failed request.
+	 *
+	 * @param requestId ID of the request
+	 * @param cause     Cause of the request failure
+	 */
+	void onRequestFailure(long requestId, Throwable cause);
+
+	/**
+	 * Called on any failure, which is not related to a specific request.
+	 *
+	 * <p>This can be for example a caught Exception in the channel pipeline
+	 * or an unexpected channel close.
+	 *
+	 * @param cause Cause of the failure
+	 */
+	void onFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
new file mode 100644
index 0000000..f26c267
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The base class for every message exchanged during the communication between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ *
+ * <p>Every such message should also have a {@link MessageDeserializer}.
+ */
+@Internal
+public abstract class MessageBody {
+
+	/**
+	 * Serializes the message into a byte array.
+	 * @return A byte array with the serialized content of the message.
+	 */
+	public abstract byte[] serialize();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
new file mode 100644
index 0000000..436fb82
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/**
+ * A utility used to deserialize a {@link MessageBody message}.
+ * @param <M> The type of the message to be deserialized.
+ *           It has to extend {@link MessageBody}
+ */
+@Internal
+public interface MessageDeserializer<M extends MessageBody> {
+
+	/**
+	 * Deserializes a message contained in a byte buffer.
+	 * @param buf the buffer containing the message.
+	 * @return The deserialized message.
+	 */
+	M deserializeMessage(ByteBuf buf);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
index 32bca64..c0a0d32 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.queryablestate.network.messages;
 
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -37,8 +33,8 @@ import java.io.ObjectOutputStream;
 
 /**
  * Serialization and deserialization of messages exchanged between
- * {@link org.apache.flink.queryablestate.client.KvStateClient client} and
- * {@link org.apache.flink.queryablestate.server.KvStateServerImpl server}.
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
  *
  * <p>The binary messages have the following format:
  *
@@ -52,8 +48,12 @@ import java.io.ObjectOutputStream;
  * </pre>
  *
  * <p>The concrete content of a message depends on the {@link MessageType}.
+ *
+ * @param <REQ>		Type of the requests of the protocol.
+ * @param <RESP>	Type of the responses of the protocol.
  */
-public final class MessageSerializer {
+@Internal
+public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> {
 
 	/** The serialization version ID. */
 	private static final int VERSION = 0x79a1b710;
@@ -64,78 +64,58 @@ public final class MessageSerializer {
 	/** Byte length of the request id. */
 	private static final int REQUEST_ID_SIZE = Long.BYTES;
 
+	/** The constructor of the {@link MessageBody client requests}. Used for deserialization. */
+	private final MessageDeserializer<REQ> requestDeserializer;
+
+	/** The constructor of the {@link MessageBody server responses}. Used for deserialization. */
+	private final MessageDeserializer<RESP> responseDeserializer;
+
+	public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) {
+		requestDeserializer = Preconditions.checkNotNull(requestDeser);
+		responseDeserializer = Preconditions.checkNotNull(responseDeser);
+	}
+
 	// ------------------------------------------------------------------------
 	// Serialization
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Allocates a buffer and serializes the KvState request into it.
+	 * Serializes the request sent to the
+	 * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
 	 *
-	 * @param alloc                     ByteBuf allocator for the buffer to
-	 *                                  serialize message into
-	 * @param requestId                 ID for this request
-	 * @param kvStateId                 ID of the requested KvState instance
-	 * @param serializedKeyAndNamespace Serialized key and namespace to request
-	 *                                  from the KvState instance.
-	 * @return Serialized KvState request message
+	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+	 * @param requestId		The id of the request to which the message refers to.
+	 * @param request		The request to be serialized.
+	 * @return A {@link ByteBuf} containing the serialized message.
 	 */
-	public static ByteBuf serializeKvStateRequest(
-			ByteBufAllocator alloc,
-			long requestId,
-			KvStateID kvStateId,
-			byte[] serializedKeyAndNamespace) {
-
-		// Header + request ID + KvState ID + Serialized namespace
-		int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + AbstractID.SIZE + (Integer.BYTES + serializedKeyAndNamespace.length);
-		ByteBuf buf = alloc.ioBuffer(frameLength + 4); // +4 for frame length
-
-		buf.writeInt(frameLength);
-
-		writeHeader(buf, MessageType.REQUEST);
-
-		buf.writeLong(requestId);
-		buf.writeLong(kvStateId.getLowerPart());
-		buf.writeLong(kvStateId.getUpperPart());
-		buf.writeInt(serializedKeyAndNamespace.length);
-		buf.writeBytes(serializedKeyAndNamespace);
-
-		return buf;
+	public static <REQ extends MessageBody> ByteBuf serializeRequest(
+			final ByteBufAllocator alloc,
+			final long requestId,
+			final REQ request) {
+		Preconditions.checkNotNull(request);
+		return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize());
 	}
 
 	/**
-	 * Allocates a buffer and serializes the KvState request result into it.
+	 * Serializes the response sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client}.
 	 *
-	 * @param alloc             ByteBuf allocator for the buffer to serialize message into
-	 * @param requestId         ID for this request
-	 * @param serializedResult  Serialized Result
-	 * @return Serialized KvState request result message
+	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+	 * @param requestId		The id of the request to which the message refers to.
+	 * @param response		The response to be serialized.
+	 * @return A {@link ByteBuf} containing the serialized message.
 	 */
-	public static ByteBuf serializeKvStateRequestResult(
-			ByteBufAllocator alloc,
-			long requestId,
-			byte[] serializedResult) {
-
-		Preconditions.checkNotNull(serializedResult, "Serialized result");
-
-		// Header + request ID + serialized result
-		int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + 4 + serializedResult.length;
-
-		// TODO: 10/5/17 there was a bug all this time?
-		ByteBuf buf = alloc.ioBuffer(frameLength + 4);
-
-		buf.writeInt(frameLength);
-		writeHeader(buf, MessageType.REQUEST_RESULT);
-		buf.writeLong(requestId);
-
-		buf.writeInt(serializedResult.length);
-		buf.writeBytes(serializedResult);
-
-		return buf;
+	public static <RESP extends MessageBody> ByteBuf serializeResponse(
+			final ByteBufAllocator alloc,
+			final long requestId,
+			final RESP response) {
+		Preconditions.checkNotNull(response);
+		return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize());
 	}
 
 	/**
 	 * Serializes the exception containing the failure message sent to the
-	 * {@link org.apache.flink.queryablestate.client.KvStateClient} in case of
+	 * {@link org.apache.flink.queryablestate.network.Client} in case of
 	 * protocol related errors.
 	 *
 	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
@@ -143,7 +123,7 @@ public final class MessageSerializer {
 	 * @param cause			The exception thrown at the server.
 	 * @return A {@link ByteBuf} containing the serialized message.
 	 */
-	public static ByteBuf serializeKvStateRequestFailure(
+	public static ByteBuf serializeRequestFailure(
 			final ByteBufAllocator alloc,
 			final long requestId,
 			final Throwable cause) throws IOException {
@@ -168,7 +148,7 @@ public final class MessageSerializer {
 
 	/**
 	 * Serializes the failure message sent to the
-	 * {@link org.apache.flink.queryablestate.client.KvStateClient} in case of
+	 * {@link org.apache.flink.queryablestate.network.Client} in case of
 	 * server related errors.
 	 *
 	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
@@ -207,6 +187,31 @@ public final class MessageSerializer {
 		buf.writeInt(messageType.ordinal());
 	}
 
+	/**
+	 * Helper for serializing the messages.
+	 *
+	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+	 * @param requestId		The id of the request to which the message refers to.
+	 * @param messageType	The {@link MessageType type of the message}.
+	 * @param payload		The serialized version of the message.
+	 * @return A {@link ByteBuf} containing the serialized message.
+	 */
+	private static ByteBuf writePayload(
+			final ByteBufAllocator alloc,
+			final long requestId,
+			final MessageType messageType,
+			final byte[] payload) {
+
+		final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length;
+		final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES);
+
+		buf.writeInt(frameLength);
+		writeHeader(buf, messageType);
+		buf.writeLong(requestId);
+		buf.writeBytes(payload);
+		return buf;
+	}
+
 	// ------------------------------------------------------------------------
 	// Deserialization
 	// ------------------------------------------------------------------------
@@ -230,71 +235,54 @@ public final class MessageSerializer {
 		// fetching the message type
 		int msgType = buf.readInt();
 		MessageType[] values = MessageType.values();
-		Preconditions.checkState(msgType >= 0 && msgType <= values.length,
+		Preconditions.checkState(msgType >= 0 && msgType < values.length,
 				"Illegal message type with index " + msgType + '.');
 		return values[msgType];
 	}
 
 	/**
-	 * Deserializes the KvState request message.
-	 *
-	 * <p><strong>Important</strong>: the returned buffer is sliced from the
-	 * incoming ByteBuf stream and retained. Therefore, it needs to be recycled
-	 * by the consumer.
-	 *
-	 * @param buf Buffer to deserialize (expected to be positioned after header)
-	 * @return Deserialized KvStateRequest
+	 * De-serializes the header and returns the {@link MessageType}.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the request id position.</b>
+	 * </pre>
+	 * @param buf	The {@link ByteBuf} containing the serialized request id.
+	 * @return		The request id.
 	 */
-	public static KvStateRequest deserializeKvStateRequest(ByteBuf buf) {
-		long requestId = buf.readLong();
-		KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong());
-
-		// Serialized key and namespace
-		int length = buf.readInt();
-
-		if (length < 0) {
-			throw new IllegalArgumentException("Negative length for serialized key and namespace. " +
-					"This indicates a serialization error.");
-		}
-
-		// Copy the buffer in order to be able to safely recycle the ByteBuf
-		byte[] serializedKeyAndNamespace = new byte[length];
-		if (length > 0) {
-			buf.readBytes(serializedKeyAndNamespace);
-		}
-
-		return new KvStateRequest(requestId, kvStateId, serializedKeyAndNamespace);
+	public static long getRequestId(final ByteBuf buf) {
+		return buf.readLong();
 	}
 
 	/**
-	 * Deserializes the KvState request result.
-	 *
-	 * @param buf Buffer to deserialize (expected to be positioned after header)
-	 * @return Deserialized KvStateRequestResult
+	 * De-serializes the request sent to the
+	 * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the request position.</b>
+	 * </pre>
+	 * @param buf	The {@link ByteBuf} containing the serialized request.
+	 * @return		The request.
 	 */
-	public static KvStateRequestResult deserializeKvStateRequestResult(ByteBuf buf) {
-		long requestId = buf.readLong();
-
-		// Serialized KvState
-		int length = buf.readInt();
-
-		if (length < 0) {
-			throw new IllegalArgumentException("Negative length for serialized result. " +
-					"This indicates a serialization error.");
-		}
-
-		byte[] serializedValue = new byte[length];
-
-		if (length > 0) {
-			buf.readBytes(serializedValue);
-		}
+	public REQ deserializeRequest(final ByteBuf buf) {
+		Preconditions.checkNotNull(buf);
+		return requestDeserializer.deserializeMessage(buf);
+	}
 
-		return new KvStateRequestResult(requestId, serializedValue);
+	/**
+	 * De-serializes the response sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client}.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the response position.</b>
+	 * </pre>
+	 * @param buf	The {@link ByteBuf} containing the serialized response.
+	 * @return		The response.
+	 */
+	public RESP deserializeResponse(final ByteBuf buf) {
+		Preconditions.checkNotNull(buf);
+		return responseDeserializer.deserializeMessage(buf);
 	}
 
 	/**
-	 * De-serializes the {@link KvStateRequestFailure} sent to the
-	 * {@link org.apache.flink.queryablestate.client.KvStateClient} in case of
+	 * De-serializes the {@link RequestFailure} sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client} in case of
 	 * protocol related errors.
 	 * <pre>
 	 *  <b>The buffer is expected to be at the correct position.</b>
@@ -302,7 +290,7 @@ public final class MessageSerializer {
 	 * @param buf	The {@link ByteBuf} containing the serialized failure message.
 	 * @return		The failure message.
 	 */
-	public static KvStateRequestFailure deserializeKvStateRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
+	public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
 		long requestId = buf.readLong();
 
 		Throwable cause;
@@ -310,12 +298,12 @@ public final class MessageSerializer {
 				ObjectInputStream in = new ObjectInputStream(bis)) {
 			cause = (Throwable) in.readObject();
 		}
-		return new KvStateRequestFailure(requestId, cause);
+		return new RequestFailure(requestId, cause);
 	}
 
 	/**
 	 * De-serializes the failure message sent to the
-	 * {@link org.apache.flink.queryablestate.client.KvStateClient} in case of
+	 * {@link org.apache.flink.queryablestate.network.Client} in case of
 	 * server related errors.
 	 * <pre>
 	 *  <b>The buffer is expected to be at the correct position.</b>

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
index 4e4435d..562ce93 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.queryablestate.network.messages;
 
+import org.apache.flink.annotation.Internal;
+
 /**
  * Expected message types during the communication between
- * {@link org.apache.flink.queryablestate.client.KvStateClient state client} and
- * {@link org.apache.flink.queryablestate.server.KvStateServerImpl state server}.
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
  */
+@Internal
 public enum MessageType {
 
 	/** The message is a request. */

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
new file mode 100644
index 0000000..106199f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A message indicating a protocol-related error.
+ */
+@Internal
+public class RequestFailure {
+
+	/** ID of the request responding to. */
+	private final long requestId;
+
+	/** Failure cause. Not allowed to be a user type. */
+	private final Throwable cause;
+
+	/**
+	 * Creates a failure response to a {@link MessageBody}.
+	 *
+	 * @param requestId ID for the request responding to
+	 * @param cause     Failure cause (not allowed to be a user type)
+	 */
+	public RequestFailure(long requestId, Throwable cause) {
+		this.requestId = requestId;
+		this.cause = cause;
+	}
+
+	/**
+	 * Returns the request ID responding to.
+	 *
+	 * @return Request ID responding to
+	 */
+	public long getRequestId() {
+		return requestId;
+	}
+
+	/**
+	 * Returns the failure cause.
+	 *
+	 * @return Failure cause
+	 */
+	public Throwable getCause() {
+		return cause;
+	}
+
+	@Override
+	public String toString() {
+		return "RequestFailure{" +
+				"requestId=" + requestId +
+				", cause=" + cause +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java
deleted file mode 100644
index f10969e..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.server;
-
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-/**
- * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler},
- * respecting the high and low watermarks.
- *
- * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a>
- */
-public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
-
-	/** The buffer to chunk. */
-	private final ByteBuf buf;
-
-	/** Size of chunks. */
-	private final int chunkSize;
-
-	/** Closed flag. */
-	private boolean isClosed;
-
-	/** End of input flag. */
-	private boolean isEndOfInput;
-
-	public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
-		this.buf = Preconditions.checkNotNull(buf, "Buffer");
-		Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size");
-		this.chunkSize = chunkSize;
-	}
-
-	@Override
-	public boolean isEndOfInput() throws Exception {
-		return isClosed || isEndOfInput;
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (!isClosed) {
-			// If we did not consume the whole buffer yet, we have to release
-			// it here. Otherwise, it's the responsibility of the consumer.
-			if (!isEndOfInput) {
-				buf.release();
-			}
-
-			isClosed = true;
-		}
-	}
-
-	@Override
-	public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
-		if (isClosed) {
-			return null;
-		} else if (buf.readableBytes() <= chunkSize) {
-			isEndOfInput = true;
-
-			// Don't retain as the consumer is responsible to release it
-			return buf.slice();
-		} else {
-			// Return a chunk sized slice of the buffer. The ref count is
-			// shared with the original buffer. That's why we need to retain
-			// a reference here.
-			return buf.readSlice(chunkSize).retain();
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "ChunkedByteBuf{" +
-				"buf=" + buf +
-				", chunkSize=" + chunkSize +
-				", isClosed=" + isClosed +
-				", isEndOfInput=" + isEndOfInput +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 9a31fca..055a5d0 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -18,31 +18,25 @@
 
 package org.apache.flink.queryablestate.server;
 
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.KvStateRequestStats;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * This handler dispatches asynchronous tasks, which query {@link InternalKvState}
@@ -52,257 +46,62 @@ import java.util.concurrent.TimeUnit;
  * query task. The actual query is handled in a separate thread as it might
  * otherwise block the network threads (file I/O etc.).
  */
+@Internal
 @ChannelHandler.Sharable
-public class KvStateServerHandler extends ChannelInboundHandlerAdapter {
+public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
 
 	/** KvState registry holding references to the KvState instances. */
 	private final KvStateRegistry registry;
 
-	/** Thread pool for query execution. */
-	private final ExecutorService queryExecutor;
-
-	/** Exposed server statistics. */
-	private final KvStateRequestStats stats;
-
 	/**
-	 * Create the handler.
+	 * Create the handler used by the {@link KvStateServerImpl}.
 	 *
-	 * @param kvStateRegistry Registry to query.
-	 * @param queryExecutor   Thread pool for query execution.
-	 * @param stats           Exposed server statistics.
+	 * @param server the {@link KvStateServerImpl} using the handler.
+	 * @param kvStateRegistry registry to query.
+	 * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
+	 * @param stats server statistics collector.
 	 */
 	public KvStateServerHandler(
-			KvStateRegistry kvStateRegistry,
-			ExecutorService queryExecutor,
-			KvStateRequestStats stats) {
+			final KvStateServerImpl server,
+			final KvStateRegistry kvStateRegistry,
+			final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
+			final KvStateRequestStats stats) {
 
-		this.registry = Objects.requireNonNull(kvStateRegistry, "KvStateRegistry");
-		this.queryExecutor = Objects.requireNonNull(queryExecutor, "Query thread pool");
-		this.stats = Objects.requireNonNull(stats, "KvStateRequestStats");
+		super(server, serializer, stats);
+		this.registry = Preconditions.checkNotNull(kvStateRegistry);
 	}
 
 	@Override
-	public void channelActive(ChannelHandlerContext ctx) throws Exception {
-		stats.reportActiveConnection();
-	}
-
-	@Override
-	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-		stats.reportInactiveConnection();
-	}
-
-	@Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-		KvStateRequest request = null;
+	public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) {
+		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
 
 		try {
-			ByteBuf buf = (ByteBuf) msg;
-			MessageType msgType = MessageSerializer.deserializeHeader(buf);
-
-			if (msgType == MessageType.REQUEST) {
-				// ------------------------------------------------------------
-				// Request
-				// ------------------------------------------------------------
-				request = MessageSerializer.deserializeKvStateRequest(buf);
-
-				stats.reportRequest();
-
-				InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
-
-				if (kvState != null) {
-					// Execute actual query async, because it is possibly
-					// blocking (e.g. file I/O).
-					//
-					// A submission failure is not treated as fatal.
-					queryExecutor.submit(new AsyncKvStateQueryTask(ctx, request, kvState, stats));
-				} else {
-					ByteBuf unknown = MessageSerializer.serializeKvStateRequestFailure(
-							ctx.alloc(),
-							request.getRequestId(),
-							new UnknownKvStateID(request.getKvStateId()));
-
-					ctx.writeAndFlush(unknown);
-
-					stats.reportFailedRequest();
-				}
+			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
+			if (kvState == null) {
+				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
 			} else {
-				// ------------------------------------------------------------
-				// Unexpected
-				// ------------------------------------------------------------
-				ByteBuf failure = MessageSerializer.serializeServerFailure(
-						ctx.alloc(),
-						new IllegalArgumentException("Unexpected message type " + msgType
-								+ ". KvStateServerHandler expects "
-								+ MessageType.REQUEST + " messages."));
-
-				ctx.writeAndFlush(failure);
-			}
-		} catch (Throwable t) {
-			String stringifiedCause = ExceptionUtils.stringifyException(t);
-
-			ByteBuf err;
-			if (request != null) {
-				String errMsg = "Failed to handle incoming request with ID " +
-						request.getRequestId() + ". Caused by: " + stringifiedCause;
-				err = MessageSerializer.serializeKvStateRequestFailure(
-						ctx.alloc(),
-						request.getRequestId(),
-						new RuntimeException(errMsg));
-
-				stats.reportFailedRequest();
-			} else {
-				String errMsg = "Failed to handle incoming message. Caused by: " + stringifiedCause;
-				err = MessageSerializer.serializeServerFailure(
-						ctx.alloc(),
-						new RuntimeException(errMsg));
-			}
-
-			ctx.writeAndFlush(err);
-		} finally {
-			// IMPORTANT: We have to always recycle the incoming buffer.
-			// Otherwise we will leak memory out of Netty's buffer pool.
-			//
-			// If any operation ever holds on to the buffer, it is the
-			// responsibility of that operation to retain the buffer and
-			// release it later.
-			ReferenceCountUtil.release(msg);
-		}
-	}
-
-	@Override
-	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-		String stringifiedCause = ExceptionUtils.stringifyException(cause);
-		String msg = "Exception in server pipeline. Caused by: " + stringifiedCause;
-
-		ByteBuf err = MessageSerializer.serializeServerFailure(
-				ctx.alloc(),
-				new RuntimeException(msg));
-
-		ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
-	}
-
-	/**
-	 * Task to execute the actual query against the {@link InternalKvState} instance.
-	 */
-	private static class AsyncKvStateQueryTask implements Runnable {
-
-		private final ChannelHandlerContext ctx;
-
-		private final KvStateRequest request;
-
-		private final InternalKvState<?> kvState;
-
-		private final KvStateRequestStats stats;
-
-		private final long creationNanos;
-
-		public AsyncKvStateQueryTask(
-				ChannelHandlerContext ctx,
-				KvStateRequest request,
-				InternalKvState<?> kvState,
-				KvStateRequestStats stats) {
-
-			this.ctx = Objects.requireNonNull(ctx, "Channel handler context");
-			this.request = Objects.requireNonNull(request, "State query");
-			this.kvState = Objects.requireNonNull(kvState, "KvState");
-			this.stats = Objects.requireNonNull(stats, "State query stats");
-			this.creationNanos = System.nanoTime();
-		}
-
-		@Override
-		public void run() {
-			boolean success = false;
-
-			try {
-				if (!ctx.channel().isActive()) {
-					return;
-				}
-
-				// Query the KvState instance
 				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
-				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
 
+				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
 				if (serializedResult != null) {
-					// We found some data, success!
-					ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
-							ctx.alloc(),
-							request.getRequestId(),
-							serializedResult);
-
-					int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark();
-
-					ChannelFuture write;
-					if (buf.readableBytes() <= highWatermark) {
-						write = ctx.writeAndFlush(buf);
-					} else {
-						write = ctx.writeAndFlush(new ChunkedByteBuf(buf, highWatermark));
-					}
-
-					write.addListener(new QueryResultWriteListener());
-
-					success = true;
+					responseFuture.complete(new KvStateResponse(serializedResult));
 				} else {
-					// No data for the key/namespace. This is considered to be
-					// a failure.
-					ByteBuf unknownKey = MessageSerializer.serializeKvStateRequestFailure(
-							ctx.alloc(),
-							request.getRequestId(),
-							new UnknownKeyOrNamespace());
-
-					ctx.writeAndFlush(unknownKey);
-				}
-			} catch (Throwable t) {
-				try {
-					String stringifiedCause = ExceptionUtils.stringifyException(t);
-					String errMsg = "Failed to query state backend for query " +
-							request.getRequestId() + ". Caused by: " + stringifiedCause;
-
-					ByteBuf err = MessageSerializer.serializeKvStateRequestFailure(
-							ctx.alloc(), request.getRequestId(), new RuntimeException(errMsg));
-
-					ctx.writeAndFlush(err);
-				} catch (IOException e) {
-					LOG.error("Failed to respond with the error after failed to query state backend", e);
-				}
-			} finally {
-				if (!success) {
-					stats.reportFailedRequest();
+					responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName()));
 				}
 			}
+			return responseFuture;
+		} catch (Throwable t) {
+			String errMsg = "Error while processing request with ID " + requestId +
+					". Caused by: " + ExceptionUtils.stringifyException(t);
+			responseFuture.completeExceptionally(new RuntimeException(errMsg));
+			return responseFuture;
 		}
+	}
 
-		@Override
-		public String toString() {
-			return "AsyncKvStateQueryTask{" +
-					", request=" + request +
-					", creationNanos=" + creationNanos +
-					'}';
-		}
-
-		/**
-		 * Callback after query result has been written.
-		 *
-		 * <p>Gathers stats and logs errors.
-		 */
-		private class QueryResultWriteListener implements ChannelFutureListener {
-
-			@Override
-			public void operationComplete(ChannelFuture future) throws Exception {
-				long durationNanos = System.nanoTime() - creationNanos;
-				long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
-
-				if (future.isSuccess()) {
-					stats.reportSuccessfulRequest(durationMillis);
-				} else {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Query " + request + " failed after " + durationMillis + " ms", future.cause());
-					}
-
-					stats.reportFailedRequest();
-				}
-			}
-		}
+	@Override
+	public void shutdown() {
+		// do nothing
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index 4bf7e24..b4c548a 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -18,213 +18,93 @@
 
 package org.apache.flink.queryablestate.server;
 
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.netty.KvStateRequestStats;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 
 /**
- * Netty-based server answering {@link KvStateRequest} messages.
- *
- * <p>Requests are handled by asynchronous query tasks (see {@link KvStateServerHandler.AsyncKvStateQueryTask})
- * that are executed by a separate query Thread pool. This pool is shared among
- * all TCP connections.
- *
- * <p>The incoming pipeline looks as follows:
- * <pre>
- * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
- * </pre>
- *
- * <p>Received binary messages are expected to contain a frame length field. Netty's
- * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before
- * giving it to our {@link KvStateServerHandler}.
- *
- * <p>Connections are established and closed by the client. The server only
- * closes the connection on a fatal failure that cannot be recovered. A
- * server-side connection close is considered a failure by the client.
+ * The default implementation of the {@link KvStateServer}.
  */
-public class KvStateServerImpl implements KvStateServer {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateServer.class);
+@Internal
+public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
 
-	/** Server config: low water mark. */
-	private static final int LOW_WATER_MARK = 8 * 1024;
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
 
-	/** Server config: high water mark. */
-	private static final int HIGH_WATER_MARK = 32 * 1024;
+	/** The {@link KvStateRegistry} to query for state instances. */
+	private final KvStateRegistry kvStateRegistry;
 
-	/** Netty's ServerBootstrap. */
-	private final ServerBootstrap bootstrap;
+	private final KvStateRequestStats stats;
 
-	/** Query executor thread pool. */
-	private final ExecutorService queryExecutor;
-
-	/** Address of this server. */
-	private KvStateServerAddress serverAddress;
+	private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;
 
 	/**
-	 * Creates the {@link KvStateServer}.
+	 * Creates the state server.
+	 *
+	 * <p>The server is instantiated using reflection by the
+	 * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats)
+	 * QueryableStateUtils.startKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats)}.
 	 *
 	 * <p>The server needs to be started via {@link #start()} in order to bind
 	 * to the configured bind address.
 	 *
-	 * @param bindAddress         Address to bind to
-	 * @param bindPort            Port to bind to. Pick random port if 0.
-	 * @param numEventLoopThreads Number of event loop threads
-	 * @param numQueryThreads     Number of query threads
-	 * @param kvStateRegistry     KvStateRegistry to query for KvState instances
-	 * @param stats               Statistics tracker
+	 * @param bindAddress the address to listen to.
+	 * @param bindPort the port to listen to.
+	 * @param numEventLoopThreads number of event loop threads.
+	 * @param numQueryThreads number of query threads.
+	 * @param kvStateRegistry {@link KvStateRegistry} to query for state instances.
+	 * @param stats the statistics collector.
 	 */
 	public KvStateServerImpl(
-			InetAddress bindAddress,
-			Integer bindPort,
-			Integer numEventLoopThreads,
-			Integer numQueryThreads,
-			KvStateRegistry kvStateRegistry,
-			KvStateRequestStats stats) {
-
-		Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort +
-				" is out of valid port range (0-65536).");
-
-		Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
-		Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
-
-		Preconditions.checkNotNull(kvStateRegistry, "KvStateRegistry");
-		Preconditions.checkNotNull(stats, "KvStateRequestStats");
-
-		NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
-
-		ThreadFactory threadFactory = new ThreadFactoryBuilder()
-				.setDaemon(true)
-				.setNameFormat("Flink KvStateServer EventLoop Thread %d")
-				.build();
-
-		NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
-		queryExecutor = createQueryExecutor(numQueryThreads);
-
-		// Shared between all channels
-		KvStateServerHandler serverHandler = new KvStateServerHandler(
-				kvStateRegistry,
-				queryExecutor,
-				stats);
-
-		bootstrap = new ServerBootstrap()
-				// Bind address and port
-				.localAddress(bindAddress, bindPort)
-				// NIO server channels
-				.group(nioGroup)
-				.channel(NioServerSocketChannel.class)
-				// Server channel Options
-				.option(ChannelOption.ALLOCATOR, bufferPool)
-				// Child channel options
-				.childOption(ChannelOption.ALLOCATOR, bufferPool)
-				.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
-				.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
-				// See initializer for pipeline details
-				.childHandler(new KvStateServerChannelInitializer(serverHandler));
+			final InetAddress bindAddress,
+			final Integer bindPort,
+			final Integer numEventLoopThreads,
+			final Integer numQueryThreads,
+			final KvStateRegistry kvStateRegistry,
+			final KvStateRequestStats stats) {
+
+		super("Queryable State Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads);
+		this.stats = Preconditions.checkNotNull(stats);
+		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
 	}
 
 	@Override
-	public void start() throws InterruptedException {
-		Channel channel = bootstrap.bind().sync().channel();
-
-		InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
-		serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+	public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() {
+		this.serializer = new MessageSerializer<>(
+				new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+				new KvStateResponse.KvStateResponseDeserializer());
+		return new KvStateServerHandler(this, kvStateRegistry, serializer, stats);
 	}
 
-	@Override
-	public KvStateServerAddress getAddress() {
-		if (serverAddress == null) {
-			throw new IllegalStateException("KvStateServer not started yet.");
-		}
-
-		return serverAddress;
+	public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() {
+		Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started.");
+		return serializer;
 	}
 
 	@Override
-	public void shutDown() {
-		if (bootstrap != null) {
-			EventLoopGroup group = bootstrap.group();
-			if (group != null) {
-				group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
-			}
-		}
-
-		if (queryExecutor != null) {
-			queryExecutor.shutdown();
-		}
-
-		serverAddress = null;
+	public void start() throws InterruptedException {
+		super.start();
 	}
 
-	/**
-	 * Creates a thread pool for the query execution.
-	 *
-	 * @param numQueryThreads Number of query threads.
-	 * @return Thread pool for query execution
-	 */
-	private static ExecutorService createQueryExecutor(int numQueryThreads) {
-		ThreadFactory threadFactory = new ThreadFactoryBuilder()
-				.setDaemon(true)
-				.setNameFormat("Flink KvStateServer Query Thread %d")
-				.build();
-
-		return Executors.newFixedThreadPool(numQueryThreads, threadFactory);
+	@Override
+	public KvStateServerAddress getServerAddress() {
+		return super.getServerAddress();
 	}
 
-	/**
-	 * Channel pipeline initializer.
-	 *
-	 * <p>The request handler is shared, whereas the other handlers are created
-	 * per channel.
-	 */
-	private static final class KvStateServerChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-		/** The shared request handler. */
-		private final KvStateServerHandler sharedRequestHandler;
-
-		/**
-		 * Creates the channel pipeline initializer with the shared request handler.
-		 *
-		 * @param sharedRequestHandler Shared request handler.
-		 */
-		public KvStateServerChannelInitializer(KvStateServerHandler sharedRequestHandler) {
-			this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "Request handler");
-		}
-
-		@Override
-		protected void initChannel(SocketChannel ch) throws Exception {
-			ch.pipeline()
-					.addLast(new ChunkedWriteHandler())
-					.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-					.addLast(sharedRequestHandler);
-		}
+	@Override
+	public void shutdown() {
+		super.shutdown();
 	}
-
 }