You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/26 17:01:57 UTC

[08/13] flink git commit: [FLINK-7908][QS] Restructure the queryable state module.

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
deleted file mode 100644
index 18a88da..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The base class of every handler used by an {@link AbstractServerBase}.
- *
- * @param <REQ> the type of request the server expects to receive.
- * @param <RESP> the type of response the server will send.
- */
-@Internal
-@ChannelHandler.Sharable
-public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
-
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractServerHandler.class);
-
-	/** The owning server of this handler. */
-	private final AbstractServerBase<REQ, RESP> server;
-
-	/** The serializer used to (de-)serialize messages. */
-	private final MessageSerializer<REQ, RESP> serializer;
-
-	/** Thread pool for query execution. */
-	protected final ExecutorService queryExecutor;
-
-	/** Exposed server statistics. */
-	private final KvStateRequestStats stats;
-
-	/**
-	 * Create the handler.
-	 *
-	 * @param serializer the serializer used to (de-)serialize messages
-	 * @param stats statistics collector
-	 */
-	public AbstractServerHandler(
-			final AbstractServerBase<REQ, RESP> server,
-			final MessageSerializer<REQ, RESP> serializer,
-			final KvStateRequestStats stats) {
-
-		this.server = Preconditions.checkNotNull(server);
-		this.serializer = Preconditions.checkNotNull(serializer);
-		this.queryExecutor = server.getQueryExecutor();
-		this.stats = Preconditions.checkNotNull(stats);
-	}
-
-	protected String getServerName() {
-		return server.getServerName();
-	}
-
-	@Override
-	public void channelActive(ChannelHandlerContext ctx) throws Exception {
-		stats.reportActiveConnection();
-	}
-
-	@Override
-	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-		stats.reportInactiveConnection();
-	}
-
-	@Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-		REQ request = null;
-		long requestId = -1L;
-
-		try {
-			final ByteBuf buf = (ByteBuf) msg;
-			final MessageType msgType = MessageSerializer.deserializeHeader(buf);
-
-			requestId = MessageSerializer.getRequestId(buf);
-
-			if (msgType == MessageType.REQUEST) {
-
-				// ------------------------------------------------------------
-				// MessageBody
-				// ------------------------------------------------------------
-				request = serializer.deserializeRequest(buf);
-				stats.reportRequest();
-
-				// Execute actual query async, because it is possibly
-				// blocking (e.g. file I/O).
-				//
-				// A submission failure is not treated as fatal.
-				queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats));
-
-			} else {
-				// ------------------------------------------------------------
-				// Unexpected
-				// ------------------------------------------------------------
-
-				final String errMsg = "Unexpected message type " + msgType + ". Expected " + MessageType.REQUEST + ".";
-				final ByteBuf failure = MessageSerializer.serializeServerFailure(ctx.alloc(), new IllegalArgumentException(errMsg));
-
-				LOG.debug(errMsg);
-				ctx.writeAndFlush(failure);
-			}
-		} catch (Throwable t) {
-			final String stringifiedCause = ExceptionUtils.stringifyException(t);
-
-			String errMsg;
-			ByteBuf err;
-			if (request != null) {
-				errMsg = "Failed request with ID " + requestId + ". Caused by: " + stringifiedCause;
-				err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
-				stats.reportFailedRequest();
-			} else {
-				errMsg = "Failed incoming message. Caused by: " + stringifiedCause;
-				err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(errMsg));
-			}
-
-			LOG.debug(errMsg);
-			ctx.writeAndFlush(err);
-
-		} finally {
-			// IMPORTANT: We have to always recycle the incoming buffer.
-			// Otherwise we will leak memory out of Netty's buffer pool.
-			//
-			// If any operation ever holds on to the buffer, it is the
-			// responsibility of that operation to retain the buffer and
-			// release it later.
-			ReferenceCountUtil.release(msg);
-		}
-	}
-
-	@Override
-	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-		final String msg = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(cause);
-		final ByteBuf err = serializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
-
-		LOG.debug(msg);
-		ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
-	}
-
-	/**
-	 * Handles an incoming request and returns a {@link CompletableFuture} containing the corresponding response.
-	 *
-	 * <p><b>NOTE:</b> This method is called by multiple threads.
-	 *
-	 * @param requestId the id of the received request to be handled.
-	 * @param request the request to be handled.
-	 * @return A future with the response to be forwarded to the client.
-	 */
-	public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request);
-
-	/**
-	 * Shuts down any handler specific resources, e.g. thread pools etc.
-	 */
-	public abstract void shutdown();
-
-	/**
-	 * Task to execute the actual query against the {@link InternalKvState} instance.
-	 */
-	private static class AsyncRequestTask<REQ extends MessageBody, RESP extends MessageBody> implements Runnable {
-
-		private final AbstractServerHandler<REQ, RESP> handler;
-
-		private final ChannelHandlerContext ctx;
-
-		private final long requestId;
-
-		private final REQ request;
-
-		private final KvStateRequestStats stats;
-
-		private final long creationNanos;
-
-		AsyncRequestTask(
-				final AbstractServerHandler<REQ, RESP> handler,
-				final ChannelHandlerContext ctx,
-				final long requestId,
-				final REQ request,
-				final KvStateRequestStats stats) {
-
-			this.handler = Preconditions.checkNotNull(handler);
-			this.ctx = Preconditions.checkNotNull(ctx);
-			this.requestId = requestId;
-			this.request = Preconditions.checkNotNull(request);
-			this.stats = Preconditions.checkNotNull(stats);
-			this.creationNanos = System.nanoTime();
-		}
-
-		@Override
-		public void run() {
-
-			if (!ctx.channel().isActive()) {
-				return;
-			}
-
-			handler.handleRequest(requestId, request).whenComplete((resp, throwable) -> {
-				try {
-					if (throwable != null) {
-						throw throwable instanceof CompletionException
-								? throwable.getCause()
-								: throwable;
-					}
-
-					if (resp == null) {
-						throw new BadRequestException(handler.getServerName(), "NULL returned for request with ID " + requestId + ".");
-					}
-
-					final ByteBuf serialResp = MessageSerializer.serializeResponse(ctx.alloc(), requestId, resp);
-
-					int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark();
-
-					ChannelFuture write;
-					if (serialResp.readableBytes() <= highWatermark) {
-						write = ctx.writeAndFlush(serialResp);
-					} else {
-						write = ctx.writeAndFlush(new ChunkedByteBuf(serialResp, highWatermark));
-					}
-					write.addListener(new RequestWriteListener());
-
-				} catch (BadRequestException e) {
-					try {
-						stats.reportFailedRequest();
-						final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, e);
-						ctx.writeAndFlush(err);
-					} catch (IOException io) {
-						LOG.error("Failed to respond with the error after failed request", io);
-					}
-				} catch (Throwable t) {
-					try {
-						stats.reportFailedRequest();
-
-						final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
-						final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
-						ctx.writeAndFlush(err);
-					} catch (IOException io) {
-						LOG.error("Failed to respond with the error after failed request", io);
-					}
-				}
-			});
-		}
-
-		@Override
-		public String toString() {
-			return "AsyncRequestTask{" +
-					"requestId=" + requestId +
-					", request=" + request +
-					'}';
-		}
-
-		/**
-		 * Callback after query result has been written.
-		 *
-		 * <p>Gathers stats and logs errors.
-		 */
-		private class RequestWriteListener implements ChannelFutureListener {
-
-			@Override
-			public void operationComplete(ChannelFuture future) throws Exception {
-				long durationNanos = System.nanoTime() - creationNanos;
-				long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
-
-				if (future.isSuccess()) {
-					LOG.debug("Request {} was successfully answered after {} ms.", request, durationMillis);
-					stats.reportSuccessfulRequest(durationMillis);
-				} else {
-					LOG.debug("Request {} failed after {} ms : ", request, durationMillis, future.cause());
-					stats.reportFailedRequest();
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
deleted file mode 100644
index 3c0c484..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Base class for exceptions thrown during querying Flink's managed state.
- */
-@Internal
-public class BadRequestException extends Exception {
-
-	private static final long serialVersionUID = 3458743952407632903L;
-
-	public BadRequestException(String serverName, String message) {
-		super(Preconditions.checkNotNull(serverName) + " : " + message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
deleted file mode 100644
index 9c56025..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-/**
- * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler},
- * respecting the high and low watermarks.
- *
- * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a>
- */
-@Internal
-public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
-
-	/** The buffer to chunk. */
-	private final ByteBuf buf;
-
-	/** Size of chunks. */
-	private final int chunkSize;
-
-	/** Closed flag. */
-	private boolean isClosed;
-
-	/** End of input flag. */
-	private boolean isEndOfInput;
-
-	public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
-		this.buf = Preconditions.checkNotNull(buf, "Buffer");
-		Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size");
-		this.chunkSize = chunkSize;
-	}
-
-	@Override
-	public boolean isEndOfInput() throws Exception {
-		return isClosed || isEndOfInput;
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (!isClosed) {
-			// If we did not consume the whole buffer yet, we have to release
-			// it here. Otherwise, it's the responsibility of the consumer.
-			if (!isEndOfInput) {
-				buf.release();
-			}
-
-			isClosed = true;
-		}
-	}
-
-	@Override
-	public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
-		if (isClosed) {
-			return null;
-		} else if (buf.readableBytes() <= chunkSize) {
-			isEndOfInput = true;
-
-			// Don't retain as the consumer is responsible to release it
-			return buf.slice();
-		} else {
-			// Return a chunk sized slice of the buffer. The ref count is
-			// shared with the original buffer. That's why we need to retain
-			// a reference here.
-			return buf.readSlice(chunkSize).retain();
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "ChunkedByteBuf{" +
-				"buf=" + buf +
-				", chunkSize=" + chunkSize +
-				", isClosed=" + isClosed +
-				", isEndOfInput=" + isEndOfInput +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
deleted file mode 100644
index e6d59de..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayDeque;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The base class for every client in the queryable state module.
- * It is using pure netty to send and receive messages of type {@link MessageBody}.
- *
- * @param <REQ> the type of request the client will send.
- * @param <RESP> the type of response the client expects to receive.
- */
-@Internal
-public class Client<REQ extends MessageBody, RESP extends MessageBody> {
-
-	/** The name of the client. Used for logging and stack traces.*/
-	private final String clientName;
-
-	/** Netty's Bootstrap. */
-	private final Bootstrap bootstrap;
-
-	/** The serializer to be used for (de-)serializing messages. */
-	private final MessageSerializer<REQ, RESP> messageSerializer;
-
-	/** Statistics tracker. */
-	private final KvStateRequestStats stats;
-
-	/** Established connections. */
-	private final Map<KvStateServerAddress, EstablishedConnection> establishedConnections = new ConcurrentHashMap<>();
-
-	/** Pending connections. */
-	private final Map<KvStateServerAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>();
-
-	/** Atomic shut down flag. */
-	private final AtomicBoolean shutDown = new AtomicBoolean();
-
-	/**
-	 * Creates a client with the specified number of event loop threads.
-	 *
-	 * @param clientName the name of the client.
-	 * @param numEventLoopThreads number of event loop threads (minimum 1).
-	 * @param serializer the serializer used to (de-)serialize messages.
-	 * @param stats the statistics collector.
-	 */
-	public Client(
-			final String clientName,
-			final int numEventLoopThreads,
-			final MessageSerializer<REQ, RESP> serializer,
-			final KvStateRequestStats stats) {
-
-		Preconditions.checkArgument(numEventLoopThreads >= 1,
-				"Non-positive number of event loop threads.");
-
-		this.clientName = Preconditions.checkNotNull(clientName);
-		this.messageSerializer = Preconditions.checkNotNull(serializer);
-		this.stats = Preconditions.checkNotNull(stats);
-
-		final ThreadFactory threadFactory = new ThreadFactoryBuilder()
-				.setDaemon(true)
-				.setNameFormat("Flink " + clientName + " Event Loop Thread %d")
-				.build();
-
-		final EventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
-		final ByteBufAllocator bufferPool = new NettyBufferPool(numEventLoopThreads);
-
-		this.bootstrap = new Bootstrap()
-				.group(nioGroup)
-				.channel(NioSocketChannel.class)
-				.option(ChannelOption.ALLOCATOR, bufferPool)
-				.handler(new ChannelInitializer<SocketChannel>() {
-					@Override
-					protected void initChannel(SocketChannel channel) throws Exception {
-						channel.pipeline()
-								.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-								.addLast(new ChunkedWriteHandler());
-					}
-				});
-	}
-
-	public String getClientName() {
-		return clientName;
-	}
-
-	public CompletableFuture<RESP> sendRequest(final KvStateServerAddress serverAddress, final REQ request) {
-		if (shutDown.get()) {
-			return FutureUtils.getFailedFuture(new IllegalStateException("Shut down"));
-		}
-
-		EstablishedConnection connection = establishedConnections.get(serverAddress);
-		if (connection != null) {
-			return connection.sendRequest(request);
-		} else {
-			PendingConnection pendingConnection = pendingConnections.get(serverAddress);
-			if (pendingConnection != null) {
-				// There was a race, use the existing pending connection.
-				return pendingConnection.sendRequest(request);
-			} else {
-				// We try to connect to the server.
-				PendingConnection pending = new PendingConnection(serverAddress, messageSerializer);
-				PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending);
-
-				if (previous == null) {
-					// OK, we are responsible to connect.
-					bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()).addListener(pending);
-					return pending.sendRequest(request);
-				} else {
-					// There was a race, use the existing pending connection.
-					return previous.sendRequest(request);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Shuts down the client and closes all connections.
-	 *
-	 * <p>After a call to this method, all returned futures will be failed.
-	 */
-	public void shutdown() {
-		if (shutDown.compareAndSet(false, true)) {
-			for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
-				if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
-					conn.getValue().close();
-				}
-			}
-
-			for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) {
-				if (pendingConnections.remove(conn.getKey()) != null) {
-					conn.getValue().close();
-				}
-			}
-
-			if (bootstrap != null) {
-				EventLoopGroup group = bootstrap.group();
-				if (group != null) {
-					group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
-				}
-			}
-		}
-	}
-
-	/**
-	 * A pending connection that is in the process of connecting.
-	 */
-	private class PendingConnection implements ChannelFutureListener {
-
-		/** Lock to guard the connect call, channel hand in, etc. */
-		private final Object connectLock = new Object();
-
-		/** Address of the server we are connecting to. */
-		private final KvStateServerAddress serverAddress;
-
-		private final MessageSerializer<REQ, RESP> serializer;
-
-		/** Queue of requests while connecting. */
-		private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>();
-
-		/** The established connection after the connect succeeds. */
-		private EstablishedConnection established;
-
-		/** Closed flag. */
-		private boolean closed;
-
-		/** Failure cause if something goes wrong. */
-		private Throwable failureCause;
-
-		/**
-		 * Creates a pending connection to the given server.
-		 *
-		 * @param serverAddress Address of the server to connect to.
-		 */
-		private PendingConnection(
-				final KvStateServerAddress serverAddress,
-				final MessageSerializer<REQ, RESP> serializer) {
-			this.serverAddress = serverAddress;
-			this.serializer = serializer;
-		}
-
-		@Override
-		public void operationComplete(ChannelFuture future) throws Exception {
-			if (future.isSuccess()) {
-				handInChannel(future.channel());
-			} else {
-				close(future.cause());
-			}
-		}
-
-		/**
-		 * Returns a future holding the serialized request result.
-		 *
-		 * <p>If the channel has been established, forward the call to the
-		 * established channel, otherwise queue it for when the channel is
-		 * handed in.
-		 *
-		 * @param request the request to be sent.
-		 * @return Future holding the serialized result
-		 */
-		public CompletableFuture<RESP> sendRequest(REQ request) {
-			synchronized (connectLock) {
-				if (failureCause != null) {
-					return FutureUtils.getFailedFuture(failureCause);
-				} else if (closed) {
-					return FutureUtils.getFailedFuture(new ClosedChannelException());
-				} else {
-					if (established != null) {
-						return established.sendRequest(request);
-					} else {
-						// Queue this and handle when connected
-						final PendingRequest pending = new PendingRequest(request);
-						queuedRequests.add(pending);
-						return pending;
-					}
-				}
-			}
-		}
-
-		/**
-		 * Hands in a channel after a successful connection.
-		 *
-		 * @param channel Channel to hand in
-		 */
-		private void handInChannel(Channel channel) {
-			synchronized (connectLock) {
-				if (closed || failureCause != null) {
-					// Close the channel and we are done. Any queued requests
-					// are removed on the close/failure call and after that no
-					// new ones can be enqueued.
-					channel.close();
-				} else {
-					established = new EstablishedConnection(serverAddress, serializer, channel);
-
-					while (!queuedRequests.isEmpty()) {
-						final PendingRequest pending = queuedRequests.poll();
-
-						established.sendRequest(pending.request)
-								.thenAccept(resp -> pending.complete(resp))
-								.exceptionally(throwable -> {
-									pending.completeExceptionally(throwable);
-									return null;
-						});
-					}
-
-					// Publish the channel for the general public
-					establishedConnections.put(serverAddress, established);
-					pendingConnections.remove(serverAddress);
-
-					// Check shut down for possible race with shut down. We
-					// don't want any lingering connections after shut down,
-					// which can happen if we don't check this here.
-					if (shutDown.get()) {
-						if (establishedConnections.remove(serverAddress, established)) {
-							established.close();
-						}
-					}
-				}
-			}
-		}
-
-		/**
-		 * Close the connecting channel with a ClosedChannelException.
-		 */
-		private void close() {
-			close(new ClosedChannelException());
-		}
-
-		/**
-		 * Close the connecting channel with an Exception (can be {@code null})
-		 * or forward to the established channel.
-		 */
-		private void close(Throwable cause) {
-			synchronized (connectLock) {
-				if (!closed) {
-					if (failureCause == null) {
-						failureCause = cause;
-					}
-
-					if (established != null) {
-						established.close();
-					} else {
-						PendingRequest pending;
-						while ((pending = queuedRequests.poll()) != null) {
-							pending.completeExceptionally(cause);
-						}
-					}
-					closed = true;
-				}
-			}
-		}
-
-		@Override
-		public String toString() {
-			synchronized (connectLock) {
-				return "PendingConnection{" +
-						"serverAddress=" + serverAddress +
-						", queuedRequests=" + queuedRequests.size() +
-						", established=" + (established != null) +
-						", closed=" + closed +
-						'}';
-			}
-		}
-
-		/**
-		 * A pending request queued while the channel is connecting.
-		 */
-		private final class PendingRequest extends CompletableFuture<RESP> {
-
-			private final REQ request;
-
-			private PendingRequest(REQ request) {
-				this.request = request;
-			}
-		}
-	}
-
-	/**
-	 * An established connection that wraps the actual channel instance and is
-	 * registered at the {@link ClientHandler} for callbacks.
-	 */
-	private class EstablishedConnection implements ClientHandlerCallback<RESP> {
-
-		/** Address of the server we are connected to. */
-		private final KvStateServerAddress serverAddress;
-
-		/** The actual TCP channel. */
-		private final Channel channel;
-
-		/** Pending requests keyed by request ID. */
-		private final ConcurrentHashMap<Long, TimestampedCompletableFuture> pendingRequests = new ConcurrentHashMap<>();
-
-		/** Current request number used to assign unique request IDs. */
-		private final AtomicLong requestCount = new AtomicLong();
-
-		/** Reference to a failure that was reported by the channel. */
-		private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
-
-		/**
-		 * Creates an established connection with the given channel.
-		 *
-		 * @param serverAddress Address of the server connected to
-		 * @param channel The actual TCP channel
-		 */
-		EstablishedConnection(
-				final KvStateServerAddress serverAddress,
-				final MessageSerializer<REQ, RESP> serializer,
-				final Channel channel) {
-
-			this.serverAddress = Preconditions.checkNotNull(serverAddress);
-			this.channel = Preconditions.checkNotNull(channel);
-
-			// Add the client handler with the callback
-			channel.pipeline().addLast(
-					getClientName() + " Handler",
-					new ClientHandler<>(clientName, serializer, this)
-			);
-
-			stats.reportActiveConnection();
-		}
-
-		/**
-		 * Close the channel with a ClosedChannelException.
-		 */
-		void close() {
-			close(new ClosedChannelException());
-		}
-
-		/**
-		 * Close the channel with a cause.
-		 *
-		 * @param cause The cause to close the channel with.
-		 * @return Channel close future
-		 */
-		private boolean close(Throwable cause) {
-			if (failureCause.compareAndSet(null, cause)) {
-				channel.close();
-				stats.reportInactiveConnection();
-
-				for (long requestId : pendingRequests.keySet()) {
-					TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-					if (pending != null && pending.completeExceptionally(cause)) {
-						stats.reportFailedRequest();
-					}
-				}
-				return true;
-			}
-			return false;
-		}
-
-		/**
-		 * Returns a future holding the serialized request result.
-		 * @param request the request to be sent.
-		 * @return Future holding the serialized result
-		 */
-		CompletableFuture<RESP> sendRequest(REQ request) {
-			TimestampedCompletableFuture requestPromiseTs =
-					new TimestampedCompletableFuture(System.nanoTime());
-			try {
-				final long requestId = requestCount.getAndIncrement();
-				pendingRequests.put(requestId, requestPromiseTs);
-
-				stats.reportRequest();
-
-				ByteBuf buf = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
-
-				channel.writeAndFlush(buf).addListener((ChannelFutureListener) future -> {
-					if (!future.isSuccess()) {
-						// Fail promise if not failed to write
-						TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-						if (pending != null && pending.completeExceptionally(future.cause())) {
-							stats.reportFailedRequest();
-						}
-					}
-				});
-
-				// Check failure for possible race. We don't want any lingering
-				// promises after a failure, which can happen if we don't check
-				// this here. Note that close is treated as a failure as well.
-				Throwable failure = failureCause.get();
-				if (failure != null) {
-					// Remove from pending requests to guard against concurrent
-					// removal and to make sure that we only count it once as failed.
-					TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-					if (pending != null && pending.completeExceptionally(failure)) {
-						stats.reportFailedRequest();
-					}
-				}
-			} catch (Throwable t) {
-				requestPromiseTs.completeExceptionally(t);
-			}
-
-			return requestPromiseTs;
-		}
-
-		@Override
-		public void onRequestResult(long requestId, RESP response) {
-			TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-			if (pending != null && pending.complete(response)) {
-				long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L;
-				stats.reportSuccessfulRequest(durationMillis);
-			}
-		}
-
-		@Override
-		public void onRequestFailure(long requestId, Throwable cause) {
-			TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-			if (pending != null && pending.completeExceptionally(cause)) {
-				stats.reportFailedRequest();
-			}
-		}
-
-		@Override
-		public void onFailure(Throwable cause) {
-			if (close(cause)) {
-				// Remove from established channels, otherwise future
-				// requests will be handled by this failed channel.
-				establishedConnections.remove(serverAddress, this);
-			}
-		}
-
-		@Override
-		public String toString() {
-			return "EstablishedConnection{" +
-					"serverAddress=" + serverAddress +
-					", channel=" + channel +
-					", pendingRequests=" + pendingRequests.size() +
-					", requestCount=" + requestCount +
-					", failureCause=" + failureCause +
-					'}';
-		}
-
-		/**
-		 * Pair of promise and a timestamp.
-		 */
-		private class TimestampedCompletableFuture extends CompletableFuture<RESP> {
-
-			private final long timestampInNanos;
-
-			TimestampedCompletableFuture(long timestampInNanos) {
-				this.timestampInNanos = timestampInNanos;
-			}
-
-			public long getTimestamp() {
-				return timestampInNanos;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
deleted file mode 100644
index fc9b1d4..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.network.messages.RequestFailure;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.channels.ClosedChannelException;
-
-/**
- * The handler used by a {@link Client} to handling incoming messages.
- *
- * @param <REQ> the type of request the client will send.
- * @param <RESP> the type of response the client expects to receive.
- */
-@Internal
-public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
-
-	private final String clientName;
-
-	private final MessageSerializer<REQ, RESP> serializer;
-
-	private final ClientHandlerCallback<RESP> callback;
-
-	/**
-	 * Creates a handler with the callback.
-	 *
-	 * @param clientName the name of the client.
-	 * @param serializer the serializer used to (de-)serialize messages.
-	 * @param callback Callback for responses.
-	 */
-	public ClientHandler(
-			final String clientName,
-			final MessageSerializer<REQ, RESP> serializer,
-			final ClientHandlerCallback<RESP> callback) {
-
-		this.clientName = Preconditions.checkNotNull(clientName);
-		this.serializer = Preconditions.checkNotNull(serializer);
-		this.callback = Preconditions.checkNotNull(callback);
-	}
-
-	@Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-		try {
-			ByteBuf buf = (ByteBuf) msg;
-			MessageType msgType = MessageSerializer.deserializeHeader(buf);
-
-			if (msgType == MessageType.REQUEST_RESULT) {
-				long requestId = MessageSerializer.getRequestId(buf);
-				RESP result = serializer.deserializeResponse(buf);
-				callback.onRequestResult(requestId, result);
-			} else if (msgType == MessageType.REQUEST_FAILURE) {
-				RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf);
-				callback.onRequestFailure(failure.getRequestId(), failure.getCause());
-			} else if (msgType == MessageType.SERVER_FAILURE) {
-				throw MessageSerializer.deserializeServerFailure(buf);
-			} else {
-				throw new IllegalStateException("Unexpected response type '" + msgType + "'");
-			}
-		} catch (Throwable t1) {
-			try {
-				callback.onFailure(t1);
-			} catch (Throwable t2) {
-				LOG.error("Failed to notify callback about failure", t2);
-			}
-		} finally {
-			ReferenceCountUtil.release(msg);
-		}
-	}
-
-	@Override
-	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-		try {
-			callback.onFailure(cause);
-		} catch (Throwable t) {
-			LOG.error("Failed to notify callback about failure", t);
-		}
-	}
-
-	@Override
-	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-		// Only the client is expected to close the channel. Otherwise it
-		// indicates a failure. Note that this will be invoked in both cases
-		// though. If the callback closed the channel, the callback must be
-		// ignored.
-		try {
-			callback.onFailure(new ClosedChannelException());
-		} catch (Throwable t) {
-			LOG.error("Failed to notify callback about failure", t);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
deleted file mode 100644
index 00ce1ed..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-
-/**
- * Callback for {@link ClientHandler}.
- */
-@Internal
-public interface ClientHandlerCallback<RESP extends MessageBody> {
-
-	/**
-	 * Called on a successful request.
-	 *
-	 * @param requestId			ID of the request
-	 * @param response			The received response
-	 */
-	void onRequestResult(long requestId, RESP response);
-
-	/**
-	 * Called on a failed request.
-	 *
-	 * @param requestId ID of the request
-	 * @param cause     Cause of the request failure
-	 */
-	void onRequestFailure(long requestId, Throwable cause);
-
-	/**
-	 * Called on any failure, which is not related to a specific request.
-	 *
-	 * <p>This can be for example a caught Exception in the channel pipeline
-	 * or an unexpected channel close.
-	 *
-	 * @param cause Cause of the failure
-	 */
-	void onFailure(Throwable cause);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
deleted file mode 100644
index f26c267..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * The base class for every message exchanged during the communication between
- * {@link org.apache.flink.queryablestate.network.Client client} and
- * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
- *
- * <p>Every such message should also have a {@link MessageDeserializer}.
- */
-@Internal
-public abstract class MessageBody {
-
-	/**
-	 * Serializes the message into a byte array.
-	 * @return A byte array with the serialized content of the message.
-	 */
-	public abstract byte[] serialize();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
deleted file mode 100644
index 436fb82..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-
-/**
- * A utility used to deserialize a {@link MessageBody message}.
- * @param <M> The type of the message to be deserialized.
- *           It has to extend {@link MessageBody}
- */
-@Internal
-public interface MessageDeserializer<M extends MessageBody> {
-
-	/**
-	 * Deserializes a message contained in a byte buffer.
-	 * @param buf the buffer containing the message.
-	 * @return The deserialized message.
-	 */
-	M deserializeMessage(ByteBuf buf);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
deleted file mode 100644
index c0a0d32..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-
-/**
- * Serialization and deserialization of messages exchanged between
- * {@link org.apache.flink.queryablestate.network.Client client} and
- * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
- *
- * <p>The binary messages have the following format:
- *
- * <pre>
- *                     <------ Frame ------------------------->
- *                    +----------------------------------------+
- *                    |        HEADER (8)      | PAYLOAD (VAR) |
- * +------------------+----------------------------------------+
- * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
- * +------------------+----------------------------------------+
- * </pre>
- *
- * <p>The concrete content of a message depends on the {@link MessageType}.
- *
- * @param <REQ>		Type of the requests of the protocol.
- * @param <RESP>	Type of the responses of the protocol.
- */
-@Internal
-public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> {
-
-	/** The serialization version ID. */
-	private static final int VERSION = 0x79a1b710;
-
-	/** Byte length of the header. */
-	private static final int HEADER_LENGTH = 2 * Integer.BYTES;
-
-	/** Byte length of the request id. */
-	private static final int REQUEST_ID_SIZE = Long.BYTES;
-
-	/** The constructor of the {@link MessageBody client requests}. Used for deserialization. */
-	private final MessageDeserializer<REQ> requestDeserializer;
-
-	/** The constructor of the {@link MessageBody server responses}. Used for deserialization. */
-	private final MessageDeserializer<RESP> responseDeserializer;
-
-	public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) {
-		requestDeserializer = Preconditions.checkNotNull(requestDeser);
-		responseDeserializer = Preconditions.checkNotNull(responseDeser);
-	}
-
-	// ------------------------------------------------------------------------
-	// Serialization
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Serializes the request sent to the
-	 * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
-	 *
-	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
-	 * @param requestId		The id of the request to which the message refers to.
-	 * @param request		The request to be serialized.
-	 * @return A {@link ByteBuf} containing the serialized message.
-	 */
-	public static <REQ extends MessageBody> ByteBuf serializeRequest(
-			final ByteBufAllocator alloc,
-			final long requestId,
-			final REQ request) {
-		Preconditions.checkNotNull(request);
-		return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize());
-	}
-
-	/**
-	 * Serializes the response sent to the
-	 * {@link org.apache.flink.queryablestate.network.Client}.
-	 *
-	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
-	 * @param requestId		The id of the request to which the message refers to.
-	 * @param response		The response to be serialized.
-	 * @return A {@link ByteBuf} containing the serialized message.
-	 */
-	public static <RESP extends MessageBody> ByteBuf serializeResponse(
-			final ByteBufAllocator alloc,
-			final long requestId,
-			final RESP response) {
-		Preconditions.checkNotNull(response);
-		return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize());
-	}
-
-	/**
-	 * Serializes the exception containing the failure message sent to the
-	 * {@link org.apache.flink.queryablestate.network.Client} in case of
-	 * protocol related errors.
-	 *
-	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
-	 * @param requestId		The id of the request to which the message refers to.
-	 * @param cause			The exception thrown at the server.
-	 * @return A {@link ByteBuf} containing the serialized message.
-	 */
-	public static ByteBuf serializeRequestFailure(
-			final ByteBufAllocator alloc,
-			final long requestId,
-			final Throwable cause) throws IOException {
-
-		final ByteBuf buf = alloc.ioBuffer();
-
-		// Frame length is set at the end
-		buf.writeInt(0);
-		writeHeader(buf, MessageType.REQUEST_FAILURE);
-		buf.writeLong(requestId);
-
-		try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
-				ObjectOutput out = new ObjectOutputStream(bbos)) {
-			out.writeObject(cause);
-		}
-
-		// Set frame length
-		int frameLength = buf.readableBytes() - Integer.BYTES;
-		buf.setInt(0, frameLength);
-		return buf;
-	}
-
-	/**
-	 * Serializes the failure message sent to the
-	 * {@link org.apache.flink.queryablestate.network.Client} in case of
-	 * server related errors.
-	 *
-	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
-	 * @param cause			The exception thrown at the server.
-	 * @return		The failure message.
-	 */
-	public static ByteBuf serializeServerFailure(
-			final ByteBufAllocator alloc,
-			final Throwable cause) throws IOException {
-
-		final ByteBuf buf = alloc.ioBuffer();
-
-		// Frame length is set at end
-		buf.writeInt(0);
-		writeHeader(buf, MessageType.SERVER_FAILURE);
-
-		try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
-				ObjectOutput out = new ObjectOutputStream(bbos)) {
-			out.writeObject(cause);
-		}
-
-		// Set frame length
-		int frameLength = buf.readableBytes() - Integer.BYTES;
-		buf.setInt(0, frameLength);
-		return buf;
-	}
-
-	/**
-	 * Helper for serializing the header.
-	 *
-	 * @param buf         The {@link ByteBuf} to serialize the header into.
-	 * @param messageType The {@link MessageType} of the message this header refers to.
-	 */
-	private static void writeHeader(final ByteBuf buf, final MessageType messageType) {
-		buf.writeInt(VERSION);
-		buf.writeInt(messageType.ordinal());
-	}
-
-	/**
-	 * Helper for serializing the messages.
-	 *
-	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
-	 * @param requestId		The id of the request to which the message refers to.
-	 * @param messageType	The {@link MessageType type of the message}.
-	 * @param payload		The serialized version of the message.
-	 * @return A {@link ByteBuf} containing the serialized message.
-	 */
-	private static ByteBuf writePayload(
-			final ByteBufAllocator alloc,
-			final long requestId,
-			final MessageType messageType,
-			final byte[] payload) {
-
-		final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length;
-		final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES);
-
-		buf.writeInt(frameLength);
-		writeHeader(buf, messageType);
-		buf.writeLong(requestId);
-		buf.writeBytes(payload);
-		return buf;
-	}
-
-	// ------------------------------------------------------------------------
-	// Deserialization
-	// ------------------------------------------------------------------------
-
-	/**
-	 * De-serializes the header and returns the {@link MessageType}.
-	 * <pre>
-	 *  <b>The buffer is expected to be at the header position.</b>
-	 * </pre>
-	 * @param buf						The {@link ByteBuf} containing the serialized header.
-	 * @return							The message type.
-	 * @throws IllegalStateException	If unexpected message version or message type.
-	 */
-	public static MessageType deserializeHeader(final ByteBuf buf) {
-
-		// checking the version
-		int version = buf.readInt();
-		Preconditions.checkState(version == VERSION,
-				"Version Mismatch:  Found " + version + ", Expected: " + VERSION + '.');
-
-		// fetching the message type
-		int msgType = buf.readInt();
-		MessageType[] values = MessageType.values();
-		Preconditions.checkState(msgType >= 0 && msgType < values.length,
-				"Illegal message type with index " + msgType + '.');
-		return values[msgType];
-	}
-
-	/**
-	 * De-serializes the header and returns the {@link MessageType}.
-	 * <pre>
-	 *  <b>The buffer is expected to be at the request id position.</b>
-	 * </pre>
-	 * @param buf	The {@link ByteBuf} containing the serialized request id.
-	 * @return		The request id.
-	 */
-	public static long getRequestId(final ByteBuf buf) {
-		return buf.readLong();
-	}
-
-	/**
-	 * De-serializes the request sent to the
-	 * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
-	 * <pre>
-	 *  <b>The buffer is expected to be at the request position.</b>
-	 * </pre>
-	 * @param buf	The {@link ByteBuf} containing the serialized request.
-	 * @return		The request.
-	 */
-	public REQ deserializeRequest(final ByteBuf buf) {
-		Preconditions.checkNotNull(buf);
-		return requestDeserializer.deserializeMessage(buf);
-	}
-
-	/**
-	 * De-serializes the response sent to the
-	 * {@link org.apache.flink.queryablestate.network.Client}.
-	 * <pre>
-	 *  <b>The buffer is expected to be at the response position.</b>
-	 * </pre>
-	 * @param buf	The {@link ByteBuf} containing the serialized response.
-	 * @return		The response.
-	 */
-	public RESP deserializeResponse(final ByteBuf buf) {
-		Preconditions.checkNotNull(buf);
-		return responseDeserializer.deserializeMessage(buf);
-	}
-
-	/**
-	 * De-serializes the {@link RequestFailure} sent to the
-	 * {@link org.apache.flink.queryablestate.network.Client} in case of
-	 * protocol related errors.
-	 * <pre>
-	 *  <b>The buffer is expected to be at the correct position.</b>
-	 * </pre>
-	 * @param buf	The {@link ByteBuf} containing the serialized failure message.
-	 * @return		The failure message.
-	 */
-	public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
-		long requestId = buf.readLong();
-
-		Throwable cause;
-		try (ByteBufInputStream bis = new ByteBufInputStream(buf);
-				ObjectInputStream in = new ObjectInputStream(bis)) {
-			cause = (Throwable) in.readObject();
-		}
-		return new RequestFailure(requestId, cause);
-	}
-
-	/**
-	 * De-serializes the failure message sent to the
-	 * {@link org.apache.flink.queryablestate.network.Client} in case of
-	 * server related errors.
-	 * <pre>
-	 *  <b>The buffer is expected to be at the correct position.</b>
-	 * </pre>
-	 * @param buf	The {@link ByteBuf} containing the serialized failure message.
-	 * @return		The failure message.
-	 */
-	public static Throwable deserializeServerFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
-		try (ByteBufInputStream bis = new ByteBufInputStream(buf);
-				ObjectInputStream in = new ObjectInputStream(bis)) {
-			return (Throwable) in.readObject();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
deleted file mode 100644
index 562ce93..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Expected message types during the communication between
- * {@link org.apache.flink.queryablestate.network.Client client} and
- * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
- */
-@Internal
-public enum MessageType {
-
-	/** The message is a request. */
-	REQUEST,
-
-	/** The message is a successful response. */
-	REQUEST_RESULT,
-
-	/** The message indicates a protocol-related failure. */
-	REQUEST_FAILURE,
-
-	/** The message indicates a server failure. */
-	SERVER_FAILURE
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
deleted file mode 100644
index 106199f..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * A message indicating a protocol-related error.
- */
-@Internal
-public class RequestFailure {
-
-	/** ID of the request responding to. */
-	private final long requestId;
-
-	/** Failure cause. Not allowed to be a user type. */
-	private final Throwable cause;
-
-	/**
-	 * Creates a failure response to a {@link MessageBody}.
-	 *
-	 * @param requestId ID for the request responding to
-	 * @param cause     Failure cause (not allowed to be a user type)
-	 */
-	public RequestFailure(long requestId, Throwable cause) {
-		this.requestId = requestId;
-		this.cause = cause;
-	}
-
-	/**
-	 * Returns the request ID responding to.
-	 *
-	 * @return Request ID responding to
-	 */
-	public long getRequestId() {
-		return requestId;
-	}
-
-	/**
-	 * Returns the failure cause.
-	 *
-	 * @return Failure cause
-	 */
-	public Throwable getCause() {
-		return cause;
-	}
-
-	@Override
-	public String toString() {
-		return "RequestFailure{" +
-				"requestId=" + requestId +
-				", cause=" + cause +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
deleted file mode 100644
index 055a5d0..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.server;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
-import org.apache.flink.queryablestate.UnknownKvStateIdException;
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.AbstractServerHandler;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * This handler dispatches asynchronous tasks, which query {@link InternalKvState}
- * instances and write the result to the channel.
- *
- * <p>The network threads receive the message, deserialize it and dispatch the
- * query task. The actual query is handled in a separate thread as it might
- * otherwise block the network threads (file I/O etc.).
- */
-@Internal
-@ChannelHandler.Sharable
-public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
-
-	/** KvState registry holding references to the KvState instances. */
-	private final KvStateRegistry registry;
-
-	/**
-	 * Create the handler used by the {@link KvStateServerImpl}.
-	 *
-	 * @param server the {@link KvStateServerImpl} using the handler.
-	 * @param kvStateRegistry registry to query.
-	 * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
-	 * @param stats server statistics collector.
-	 */
-	public KvStateServerHandler(
-			final KvStateServerImpl server,
-			final KvStateRegistry kvStateRegistry,
-			final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
-			final KvStateRequestStats stats) {
-
-		super(server, serializer, stats);
-		this.registry = Preconditions.checkNotNull(kvStateRegistry);
-	}
-
-	@Override
-	public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) {
-		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
-
-		try {
-			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
-			if (kvState == null) {
-				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
-			} else {
-				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
-
-				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
-				if (serializedResult != null) {
-					responseFuture.complete(new KvStateResponse(serializedResult));
-				} else {
-					responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName()));
-				}
-			}
-			return responseFuture;
-		} catch (Throwable t) {
-			String errMsg = "Error while processing request with ID " + requestId +
-					". Caused by: " + ExceptionUtils.stringifyException(t);
-			responseFuture.completeExceptionally(new RuntimeException(errMsg));
-			return responseFuture;
-		}
-	}
-
-	@Override
-	public void shutdown() {
-		// do nothing
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
deleted file mode 100644
index dfca915..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.server;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.AbstractServerBase;
-import org.apache.flink.queryablestate.network.AbstractServerHandler;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.Iterator;
-
-/**
- * The default implementation of the {@link KvStateServer}.
- */
-@Internal
-public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
-
-	/** The {@link KvStateRegistry} to query for state instances. */
-	private final KvStateRegistry kvStateRegistry;
-
-	private final KvStateRequestStats stats;
-
-	private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;
-
-	/**
-	 * Creates the state server.
-	 *
-	 * <p>The server is instantiated using reflection by the
-	 * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)
-	 * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)}.
-	 *
-	 * <p>The server needs to be started via {@link #start()} in order to bind
-	 * to the configured bind address.
-	 *
-	 * @param bindAddress the address to listen to.
-	 * @param bindPortIterator the port range to try to bind to.
-	 * @param numEventLoopThreads number of event loop threads.
-	 * @param numQueryThreads number of query threads.
-	 * @param kvStateRegistry {@link KvStateRegistry} to query for state instances.
-	 * @param stats the statistics collector.
-	 */
-	public KvStateServerImpl(
-			final InetAddress bindAddress,
-			final Iterator<Integer> bindPortIterator,
-			final Integer numEventLoopThreads,
-			final Integer numQueryThreads,
-			final KvStateRegistry kvStateRegistry,
-			final KvStateRequestStats stats) {
-
-		super("Queryable State Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads);
-		this.stats = Preconditions.checkNotNull(stats);
-		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
-	}
-
-	@Override
-	public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() {
-		this.serializer = new MessageSerializer<>(
-				new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
-				new KvStateResponse.KvStateResponseDeserializer());
-		return new KvStateServerHandler(this, kvStateRegistry, serializer, stats);
-	}
-
-	public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() {
-		Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started.");
-		return serializer;
-	}
-
-	@Override
-	public void start() throws Throwable {
-		super.start();
-	}
-
-	@Override
-	public KvStateServerAddress getServerAddress() {
-		return super.getServerAddress();
-	}
-
-	@Override
-	public void shutdown() {
-		super.shutdown();
-	}
-}