You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/26 17:01:59 UTC

[10/13] flink git commit: [FLINK-7908][QS] Restructure the queryable state module.

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
new file mode 100644
index 0000000..fc9b1d4
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * The handler used by a {@link Client} to handling incoming messages.
+ *
+ * @param <REQ> the type of request the client will send.
+ * @param <RESP> the type of response the client expects to receive.
+ */
+@Internal
+public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
+
+	private final String clientName;
+
+	private final MessageSerializer<REQ, RESP> serializer;
+
+	private final ClientHandlerCallback<RESP> callback;
+
+	/**
+	 * Creates a handler with the callback.
+	 *
+	 * @param clientName the name of the client.
+	 * @param serializer the serializer used to (de-)serialize messages.
+	 * @param callback Callback for responses.
+	 */
+	public ClientHandler(
+			final String clientName,
+			final MessageSerializer<REQ, RESP> serializer,
+			final ClientHandlerCallback<RESP> callback) {
+
+		this.clientName = Preconditions.checkNotNull(clientName);
+		this.serializer = Preconditions.checkNotNull(serializer);
+		this.callback = Preconditions.checkNotNull(callback);
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		try {
+			ByteBuf buf = (ByteBuf) msg;
+			MessageType msgType = MessageSerializer.deserializeHeader(buf);
+
+			if (msgType == MessageType.REQUEST_RESULT) {
+				long requestId = MessageSerializer.getRequestId(buf);
+				RESP result = serializer.deserializeResponse(buf);
+				callback.onRequestResult(requestId, result);
+			} else if (msgType == MessageType.REQUEST_FAILURE) {
+				RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf);
+				callback.onRequestFailure(failure.getRequestId(), failure.getCause());
+			} else if (msgType == MessageType.SERVER_FAILURE) {
+				throw MessageSerializer.deserializeServerFailure(buf);
+			} else {
+				throw new IllegalStateException("Unexpected response type '" + msgType + "'");
+			}
+		} catch (Throwable t1) {
+			try {
+				callback.onFailure(t1);
+			} catch (Throwable t2) {
+				LOG.error("Failed to notify callback about failure", t2);
+			}
+		} finally {
+			ReferenceCountUtil.release(msg);
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		try {
+			callback.onFailure(cause);
+		} catch (Throwable t) {
+			LOG.error("Failed to notify callback about failure", t);
+		}
+	}
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		// Only the client is expected to close the channel. Otherwise it
+		// indicates a failure. Note that this will be invoked in both cases
+		// though. If the callback closed the channel, the callback must be
+		// ignored.
+		try {
+			callback.onFailure(new ClosedChannelException());
+		} catch (Throwable t) {
+			LOG.error("Failed to notify callback about failure", t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
new file mode 100644
index 0000000..00ce1ed
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+
+/**
+ * Callback for {@link ClientHandler}.
+ */
+@Internal
+public interface ClientHandlerCallback<RESP extends MessageBody> {
+
+	/**
+	 * Called on a successful request.
+	 *
+	 * @param requestId			ID of the request
+	 * @param response			The received response
+	 */
+	void onRequestResult(long requestId, RESP response);
+
+	/**
+	 * Called on a failed request.
+	 *
+	 * @param requestId ID of the request
+	 * @param cause     Cause of the request failure
+	 */
+	void onRequestFailure(long requestId, Throwable cause);
+
+	/**
+	 * Called on any failure, which is not related to a specific request.
+	 *
+	 * <p>This can be for example a caught Exception in the channel pipeline
+	 * or an unexpected channel close.
+	 *
+	 * @param cause Cause of the failure
+	 */
+	void onFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
new file mode 100644
index 0000000..5e014b8
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Wrapper around Netty's {@link PooledByteBufAllocator} with strict control
+ * over the number of created arenas.
+ */
+public class NettyBufferPool implements ByteBufAllocator {
+
+	/** The wrapped buffer allocator. */
+	private final PooledByteBufAllocator alloc;
+
+	/**
+	 * Creates Netty's buffer pool with the specified number of direct arenas.
+	 *
+	 * @param numberOfArenas Number of arenas (recommended: 2 * number of task
+	 *                       slots)
+	 */
+	public NettyBufferPool(int numberOfArenas) {
+		checkArgument(numberOfArenas >= 1, "Number of arenas");
+
+		// We strictly prefer direct buffers and disallow heap allocations.
+		boolean preferDirect = true;
+
+		// Arenas allocate chunks of pageSize << maxOrder bytes. With these
+		// defaults, this results in chunks of 16 MB.
+		int pageSize = 8192;
+		int maxOrder = 11;
+
+		// Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e.
+		// we allocate numDirectArenas * 16 MB of direct memory. This can grow
+		// to multiple chunks per arena during runtime, but this should only
+		// happen with a large amount of connections per task manager. We
+		// control the memory allocations with low/high watermarks when writing
+		// to the TCP channels. Chunks are allocated lazily.
+		int numDirectArenas = numberOfArenas;
+
+		// No heap arenas, please.
+		int numHeapArenas = 0;
+
+		this.alloc = new PooledByteBufAllocator(
+				preferDirect,
+				numHeapArenas,
+				numDirectArenas,
+				pageSize,
+				maxOrder);
+	}
+
+	// ------------------------------------------------------------------------
+	// Delegate calls to the allocated and prohibit heap buffer allocations
+	// ------------------------------------------------------------------------
+
+	@Override
+	public ByteBuf buffer() {
+		return alloc.buffer();
+	}
+
+	@Override
+	public ByteBuf buffer(int initialCapacity) {
+		return alloc.buffer(initialCapacity);
+	}
+
+	@Override
+	public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+		return alloc.buffer(initialCapacity, maxCapacity);
+	}
+
+	@Override
+	public ByteBuf ioBuffer() {
+		return alloc.ioBuffer();
+	}
+
+	@Override
+	public ByteBuf ioBuffer(int initialCapacity) {
+		return alloc.ioBuffer(initialCapacity);
+	}
+
+	@Override
+	public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+		return alloc.ioBuffer(initialCapacity, maxCapacity);
+	}
+
+	@Override
+	public ByteBuf heapBuffer() {
+		throw new UnsupportedOperationException("Heap buffer");
+	}
+
+	@Override
+	public ByteBuf heapBuffer(int initialCapacity) {
+		throw new UnsupportedOperationException("Heap buffer");
+	}
+
+	@Override
+	public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+		throw new UnsupportedOperationException("Heap buffer");
+	}
+
+	@Override
+	public ByteBuf directBuffer() {
+		return alloc.directBuffer();
+	}
+
+	@Override
+	public ByteBuf directBuffer(int initialCapacity) {
+		return alloc.directBuffer(initialCapacity);
+	}
+
+	@Override
+	public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+		return alloc.directBuffer(initialCapacity, maxCapacity);
+	}
+
+	@Override
+	public CompositeByteBuf compositeBuffer() {
+		return alloc.compositeBuffer();
+	}
+
+	@Override
+	public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+		return alloc.compositeBuffer(maxNumComponents);
+	}
+
+	@Override
+	public CompositeByteBuf compositeHeapBuffer() {
+		throw new UnsupportedOperationException("Heap buffer");
+	}
+
+	@Override
+	public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+		throw new UnsupportedOperationException("Heap buffer");
+	}
+
+	@Override
+	public CompositeByteBuf compositeDirectBuffer() {
+		return alloc.compositeDirectBuffer();
+	}
+
+	@Override
+	public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+		return alloc.compositeDirectBuffer(maxNumComponents);
+	}
+
+	@Override
+	public boolean isDirectBufferPooled() {
+		return alloc.isDirectBufferPooled();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
new file mode 100644
index 0000000..f26c267
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The base class for every message exchanged during the communication between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ *
+ * <p>Every such message should also have a {@link MessageDeserializer}.
+ */
+@Internal
+public abstract class MessageBody {
+
+	/**
+	 * Serializes the message into a byte array.
+	 * @return A byte array with the serialized content of the message.
+	 */
+	public abstract byte[] serialize();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
new file mode 100644
index 0000000..436fb82
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/**
+ * A utility used to deserialize a {@link MessageBody message}.
+ * @param <M> The type of the message to be deserialized.
+ *           It has to extend {@link MessageBody}
+ */
+@Internal
+public interface MessageDeserializer<M extends MessageBody> {
+
+	/**
+	 * Deserializes a message contained in a byte buffer.
+	 * @param buf the buffer containing the message.
+	 * @return The deserialized message.
+	 */
+	M deserializeMessage(ByteBuf buf);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
new file mode 100644
index 0000000..c0a0d32
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+
+/**
+ * Serialization and deserialization of messages exchanged between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ *
+ * <p>The binary messages have the following format:
+ *
+ * <pre>
+ *                     <------ Frame ------------------------->
+ *                    +----------------------------------------+
+ *                    |        HEADER (8)      | PAYLOAD (VAR) |
+ * +------------------+----------------------------------------+
+ * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
+ * +------------------+----------------------------------------+
+ * </pre>
+ *
+ * <p>The concrete content of a message depends on the {@link MessageType}.
+ *
+ * @param <REQ>		Type of the requests of the protocol.
+ * @param <RESP>	Type of the responses of the protocol.
+ */
+@Internal
+public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> {
+
+	/** The serialization version ID. */
+	private static final int VERSION = 0x79a1b710;
+
+	/** Byte length of the header. */
+	private static final int HEADER_LENGTH = 2 * Integer.BYTES;
+
+	/** Byte length of the request id. */
+	private static final int REQUEST_ID_SIZE = Long.BYTES;
+
+	/** The constructor of the {@link MessageBody client requests}. Used for deserialization. */
+	private final MessageDeserializer<REQ> requestDeserializer;
+
+	/** The constructor of the {@link MessageBody server responses}. Used for deserialization. */
+	private final MessageDeserializer<RESP> responseDeserializer;
+
+	public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) {
+		requestDeserializer = Preconditions.checkNotNull(requestDeser);
+		responseDeserializer = Preconditions.checkNotNull(responseDeser);
+	}
+
+	// ------------------------------------------------------------------------
+	// Serialization
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Serializes the request sent to the
+	 * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
+	 *
+	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+	 * @param requestId		The id of the request to which the message refers to.
+	 * @param request		The request to be serialized.
+	 * @return A {@link ByteBuf} containing the serialized message.
+	 */
+	public static <REQ extends MessageBody> ByteBuf serializeRequest(
+			final ByteBufAllocator alloc,
+			final long requestId,
+			final REQ request) {
+		Preconditions.checkNotNull(request);
+		return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize());
+	}
+
+	/**
+	 * Serializes the response sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client}.
+	 *
+	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+	 * @param requestId		The id of the request to which the message refers to.
+	 * @param response		The response to be serialized.
+	 * @return A {@link ByteBuf} containing the serialized message.
+	 */
+	public static <RESP extends MessageBody> ByteBuf serializeResponse(
+			final ByteBufAllocator alloc,
+			final long requestId,
+			final RESP response) {
+		Preconditions.checkNotNull(response);
+		return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize());
+	}
+
+	/**
+	 * Serializes the exception containing the failure message sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client} in case of
+	 * protocol related errors.
+	 *
+	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+	 * @param requestId		The id of the request to which the message refers to.
+	 * @param cause			The exception thrown at the server.
+	 * @return A {@link ByteBuf} containing the serialized message.
+	 */
+	public static ByteBuf serializeRequestFailure(
+			final ByteBufAllocator alloc,
+			final long requestId,
+			final Throwable cause) throws IOException {
+
+		final ByteBuf buf = alloc.ioBuffer();
+
+		// Frame length is set at the end
+		buf.writeInt(0);
+		writeHeader(buf, MessageType.REQUEST_FAILURE);
+		buf.writeLong(requestId);
+
+		try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+				ObjectOutput out = new ObjectOutputStream(bbos)) {
+			out.writeObject(cause);
+		}
+
+		// Set frame length
+		int frameLength = buf.readableBytes() - Integer.BYTES;
+		buf.setInt(0, frameLength);
+		return buf;
+	}
+
+	/**
+	 * Serializes the failure message sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client} in case of
+	 * server related errors.
+	 *
+	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+	 * @param cause			The exception thrown at the server.
+	 * @return		The failure message.
+	 */
+	public static ByteBuf serializeServerFailure(
+			final ByteBufAllocator alloc,
+			final Throwable cause) throws IOException {
+
+		final ByteBuf buf = alloc.ioBuffer();
+
+		// Frame length is set at end
+		buf.writeInt(0);
+		writeHeader(buf, MessageType.SERVER_FAILURE);
+
+		try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+				ObjectOutput out = new ObjectOutputStream(bbos)) {
+			out.writeObject(cause);
+		}
+
+		// Set frame length
+		int frameLength = buf.readableBytes() - Integer.BYTES;
+		buf.setInt(0, frameLength);
+		return buf;
+	}
+
+	/**
+	 * Helper for serializing the header.
+	 *
+	 * @param buf         The {@link ByteBuf} to serialize the header into.
+	 * @param messageType The {@link MessageType} of the message this header refers to.
+	 */
+	private static void writeHeader(final ByteBuf buf, final MessageType messageType) {
+		buf.writeInt(VERSION);
+		buf.writeInt(messageType.ordinal());
+	}
+
+	/**
+	 * Helper for serializing the messages.
+	 *
+	 * @param alloc			The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+	 * @param requestId		The id of the request to which the message refers to.
+	 * @param messageType	The {@link MessageType type of the message}.
+	 * @param payload		The serialized version of the message.
+	 * @return A {@link ByteBuf} containing the serialized message.
+	 */
+	private static ByteBuf writePayload(
+			final ByteBufAllocator alloc,
+			final long requestId,
+			final MessageType messageType,
+			final byte[] payload) {
+
+		final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length;
+		final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES);
+
+		buf.writeInt(frameLength);
+		writeHeader(buf, messageType);
+		buf.writeLong(requestId);
+		buf.writeBytes(payload);
+		return buf;
+	}
+
+	// ------------------------------------------------------------------------
+	// Deserialization
+	// ------------------------------------------------------------------------
+
+	/**
+	 * De-serializes the header and returns the {@link MessageType}.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the header position.</b>
+	 * </pre>
+	 * @param buf						The {@link ByteBuf} containing the serialized header.
+	 * @return							The message type.
+	 * @throws IllegalStateException	If unexpected message version or message type.
+	 */
+	public static MessageType deserializeHeader(final ByteBuf buf) {
+
+		// checking the version
+		int version = buf.readInt();
+		Preconditions.checkState(version == VERSION,
+				"Version Mismatch:  Found " + version + ", Expected: " + VERSION + '.');
+
+		// fetching the message type
+		int msgType = buf.readInt();
+		MessageType[] values = MessageType.values();
+		Preconditions.checkState(msgType >= 0 && msgType < values.length,
+				"Illegal message type with index " + msgType + '.');
+		return values[msgType];
+	}
+
+	/**
+	 * De-serializes the header and returns the {@link MessageType}.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the request id position.</b>
+	 * </pre>
+	 * @param buf	The {@link ByteBuf} containing the serialized request id.
+	 * @return		The request id.
+	 */
+	public static long getRequestId(final ByteBuf buf) {
+		return buf.readLong();
+	}
+
+	/**
+	 * De-serializes the request sent to the
+	 * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the request position.</b>
+	 * </pre>
+	 * @param buf	The {@link ByteBuf} containing the serialized request.
+	 * @return		The request.
+	 */
+	public REQ deserializeRequest(final ByteBuf buf) {
+		Preconditions.checkNotNull(buf);
+		return requestDeserializer.deserializeMessage(buf);
+	}
+
+	/**
+	 * De-serializes the response sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client}.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the response position.</b>
+	 * </pre>
+	 * @param buf	The {@link ByteBuf} containing the serialized response.
+	 * @return		The response.
+	 */
+	public RESP deserializeResponse(final ByteBuf buf) {
+		Preconditions.checkNotNull(buf);
+		return responseDeserializer.deserializeMessage(buf);
+	}
+
+	/**
+	 * De-serializes the {@link RequestFailure} sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client} in case of
+	 * protocol related errors.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the correct position.</b>
+	 * </pre>
+	 * @param buf	The {@link ByteBuf} containing the serialized failure message.
+	 * @return		The failure message.
+	 */
+	public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
+		long requestId = buf.readLong();
+
+		Throwable cause;
+		try (ByteBufInputStream bis = new ByteBufInputStream(buf);
+				ObjectInputStream in = new ObjectInputStream(bis)) {
+			cause = (Throwable) in.readObject();
+		}
+		return new RequestFailure(requestId, cause);
+	}
+
+	/**
+	 * De-serializes the failure message sent to the
+	 * {@link org.apache.flink.queryablestate.network.Client} in case of
+	 * server related errors.
+	 * <pre>
+	 *  <b>The buffer is expected to be at the correct position.</b>
+	 * </pre>
+	 * @param buf	The {@link ByteBuf} containing the serialized failure message.
+	 * @return		The failure message.
+	 */
+	public static Throwable deserializeServerFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
+		try (ByteBufInputStream bis = new ByteBufInputStream(buf);
+				ObjectInputStream in = new ObjectInputStream(bis)) {
+			return (Throwable) in.readObject();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
new file mode 100644
index 0000000..562ce93
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Expected message types during the communication between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ */
+@Internal
+public enum MessageType {
+
+	/** The message is a request. */
+	REQUEST,
+
+	/** The message is a successful response. */
+	REQUEST_RESULT,
+
+	/** The message indicates a protocol-related failure. */
+	REQUEST_FAILURE,
+
+	/** The message indicates a server failure. */
+	SERVER_FAILURE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
new file mode 100644
index 0000000..106199f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A message indicating a protocol-related error.
+ */
+@Internal
+public class RequestFailure {
+
+	/** ID of the request responding to. */
+	private final long requestId;
+
+	/** Failure cause. Not allowed to be a user type. */
+	private final Throwable cause;
+
+	/**
+	 * Creates a failure response to a {@link MessageBody}.
+	 *
+	 * @param requestId ID for the request responding to
+	 * @param cause     Failure cause (not allowed to be a user type)
+	 */
+	public RequestFailure(long requestId, Throwable cause) {
+		this.requestId = requestId;
+		this.cause = cause;
+	}
+
+	/**
+	 * Returns the request ID responding to.
+	 *
+	 * @return Request ID responding to
+	 */
+	public long getRequestId() {
+		return requestId;
+	}
+
+	/**
+	 * Returns the failure cause.
+	 *
+	 * @return Failure cause
+	 */
+	public Throwable getCause() {
+		return cause;
+	}
+
+	@Override
+	public String toString() {
+		return "RequestFailure{" +
+				"requestId=" + requestId +
+				", cause=" + cause +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
new file mode 100644
index 0000000..9ba5f84
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Atomic {@link KvStateRequestStats} implementation.
+ */
+public class AtomicKvStateRequestStats implements KvStateRequestStats {
+
+	/**
+	 * Number of active connections.
+	 */
+	private final AtomicLong numConnections = new AtomicLong();
+
+	/**
+	 * Total number of reported requests.
+	 */
+	private final AtomicLong numRequests = new AtomicLong();
+
+	/**
+	 * Total number of successful requests (<= reported requests).
+	 */
+	private final AtomicLong numSuccessful = new AtomicLong();
+
+	/**
+	 * Total duration of all successful requests.
+	 */
+	private final AtomicLong successfulDuration = new AtomicLong();
+
+	/**
+	 * Total number of failed requests (<= reported requests).
+	 */
+	private final AtomicLong numFailed = new AtomicLong();
+
+	@Override
+	public void reportActiveConnection() {
+		numConnections.incrementAndGet();
+	}
+
+	@Override
+	public void reportInactiveConnection() {
+		numConnections.decrementAndGet();
+	}
+
+	@Override
+	public void reportRequest() {
+		numRequests.incrementAndGet();
+	}
+
+	@Override
+	public void reportSuccessfulRequest(long durationTotalMillis) {
+		numSuccessful.incrementAndGet();
+		successfulDuration.addAndGet(durationTotalMillis);
+	}
+
+	@Override
+	public void reportFailedRequest() {
+		numFailed.incrementAndGet();
+	}
+
+	public long getNumConnections() {
+		return numConnections.get();
+	}
+
+	public long getNumRequests() {
+		return numRequests.get();
+	}
+
+	public long getNumSuccessful() {
+		return numSuccessful.get();
+	}
+
+	public long getNumFailed() {
+		return numFailed.get();
+	}
+
+	@Override
+	public String toString() {
+		return "AtomicKvStateRequestStats{" +
+				"numConnections=" + numConnections +
+				", numRequests=" + numRequests +
+				", numSuccessful=" + numSuccessful +
+				", numFailed=" + numFailed +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
new file mode 100644
index 0000000..b34ac3e
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+/**
+ * Disabled {@link KvStateRequestStats} implementation.
+ */
+public class DisabledKvStateRequestStats implements KvStateRequestStats {
+
+	@Override
+	public void reportActiveConnection() {
+	}
+
+	@Override
+	public void reportInactiveConnection() {
+	}
+
+	@Override
+	public void reportRequest() {
+	}
+
+	@Override
+	public void reportSuccessfulRequest(long durationTotalMillis) {
+	}
+
+	@Override
+	public void reportFailedRequest() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
new file mode 100644
index 0000000..8e9edd8
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+/**
+ * Simple statistics for monitoring the state server
+ * and the client proxy.
+ */
+public interface KvStateRequestStats {
+
+	/**
+	 * Reports an active connection.
+	 */
+	void reportActiveConnection();
+
+	/**
+	 * Reports an inactive connection.
+	 */
+	void reportInactiveConnection();
+
+	/**
+	 * Reports an incoming request.
+	 */
+	void reportRequest();
+
+	/**
+	 * Reports a successfully handled request.
+	 *
+	 * @param durationTotalMillis Duration of the request (in milliseconds).
+	 */
+	void reportSuccessfulRequest(long durationTotalMillis);
+
+	/**
+	 * Reports a failure during a request.
+	 */
+	void reportFailedRequest();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
new file mode 100644
index 0000000..ca11a32
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/**
+ * Test for {@link VoidNamespaceTypeInfo}.
+ */
+public class VoidNamespaceTypeInfoTest extends TypeInformationTestBase<VoidNamespaceTypeInfo> {
+
+	@Override
+	protected VoidNamespaceTypeInfo[] getTestData() {
+		return new VoidNamespaceTypeInfo[] { VoidNamespaceTypeInfo.INSTANCE };
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
new file mode 100644
index 0000000..ebbc896
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableAggregatingStateTest}.
+ */
+public class ImmutableAggregatingStateTest {
+
+	private final AggregatingStateDescriptor<Long, String, String> aggrStateDesc =
+			new AggregatingStateDescriptor<>(
+					"test",
+					new SumAggr(),
+					String.class);
+
+	private ImmutableAggregatingState<Long, String> aggrState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!aggrStateDesc.isSerializerInitialized()) {
+			aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		final String initValue = "42";
+
+		ByteArrayOutputStream out = new ByteArrayOutputStream();
+		aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out));
+
+		aggrState = ImmutableAggregatingState.createState(
+				aggrStateDesc,
+				out.toByteArray()
+		);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		String value = aggrState.get();
+		assertEquals("42", value);
+
+		aggrState.add(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		String value = aggrState.get();
+		assertEquals("42", value);
+
+		aggrState.clear();
+	}
+
+	/**
+	 * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
+	 */
+	private static class SumAggr implements AggregateFunction<Long, String, String> {
+
+		private static final long serialVersionUID = -6249227626701264599L;
+
+		@Override
+		public String createAccumulator() {
+			return "";
+		}
+
+		@Override
+		public String add(Long value, String accumulator) {
+			accumulator += ", " + value;
+			return accumulator;
+		}
+
+		@Override
+		public String getResult(String accumulator) {
+			return accumulator;
+		}
+
+		@Override
+		public String merge(String a, String b) {
+			return a + ", " + b;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
new file mode 100644
index 0000000..9e8dfc9
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableFoldingState}.
+ */
+public class ImmutableFoldingStateTest {
+
+	private final FoldingStateDescriptor<Long, String> foldingStateDesc =
+			new FoldingStateDescriptor<>(
+					"test",
+					"0",
+					new SumFold(),
+					StringSerializer.INSTANCE);
+
+	private ImmutableFoldingState<Long, String> foldingState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!foldingStateDesc.isSerializerInitialized()) {
+			foldingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		ByteArrayOutputStream out = new ByteArrayOutputStream();
+		StringSerializer.INSTANCE.serialize("42", new DataOutputViewStreamWrapper(out));
+
+		foldingState = ImmutableFoldingState.createState(
+				foldingStateDesc,
+				out.toByteArray()
+		);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		String value = foldingState.get();
+		assertEquals("42", value);
+
+		foldingState.add(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		String value = foldingState.get();
+		assertEquals("42", value);
+
+		foldingState.clear();
+	}
+
+	/**
+	 * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
+	 */
+	private static class SumFold implements FoldFunction<Long, String> {
+
+		private static final long serialVersionUID = -6249227626701264599L;
+
+		@Override
+		public String fold(String accumulator, Long value) throws Exception {
+			long acc = Long.valueOf(accumulator);
+			acc += value;
+			return Long.toString(acc);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
new file mode 100644
index 0000000..a78ed1f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableListState}.
+ */
+public class ImmutableListStateTest {
+
+	private final ListStateDescriptor<Long> listStateDesc =
+			new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
+
+	private ImmutableListState<Long> listState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!listStateDesc.isSerializerInitialized()) {
+			listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		List<Long> init = new ArrayList<>();
+		init.add(42L);
+
+		byte[] serInit = serializeInitValue(init);
+		listState = ImmutableListState.createState(listStateDesc, serInit);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		List<Long> list = getStateContents();
+		assertEquals(1L, list.size());
+
+		long element = list.get(0);
+		assertEquals(42L, element);
+
+		listState.add(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		List<Long> list = getStateContents();
+		assertEquals(1L, list.size());
+
+		long element = list.get(0);
+		assertEquals(42L, element);
+
+		listState.clear();
+	}
+
+	/**
+	 * Copied from HeapListState.getSerializedValue(Object, Object).
+	 */
+	private byte[] serializeInitValue(List<Long> toSerialize) throws IOException {
+		TypeSerializer<Long> serializer = listStateDesc.getElementSerializer();
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
+
+		// write the same as RocksDB writes lists, with one ',' separator
+		for (int i = 0; i < toSerialize.size(); i++) {
+			serializer.serialize(toSerialize.get(i), view);
+			if (i < toSerialize.size() - 1) {
+				view.writeByte(',');
+			}
+		}
+		view.flush();
+
+		return baos.toByteArray();
+	}
+
+	private List<Long> getStateContents() {
+		List<Long> list = new ArrayList<>();
+		for (Long elem: listState.get()) {
+			list.add(elem);
+		}
+		return list;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
new file mode 100644
index 0000000..ffeabae
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link ImmutableMapState}.
+ */
+public class ImmutableMapStateTest {
+
+	private final MapStateDescriptor<Long, Long> mapStateDesc =
+			new MapStateDescriptor<>(
+					"test",
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO);
+
+	private ImmutableMapState<Long, Long> mapState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!mapStateDesc.isSerializerInitialized()) {
+			mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		Map<Long, Long> initMap = new HashMap<>();
+		initMap.put(1L, 5L);
+		initMap.put(2L, 5L);
+
+		byte[] initSer = KvStateSerializer.serializeMap(
+				initMap.entrySet(),
+				BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+		mapState = ImmutableMapState.createState(mapStateDesc, initSer);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testPut() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		mapState.put(2L, 54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testPutAll() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Map<Long, Long> nMap = new HashMap<>();
+		nMap.put(1L, 7L);
+		nMap.put(2L, 7L);
+
+		mapState.putAll(nMap);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		mapState.put(2L, 54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testIterator() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator();
+		while (iterator.hasNext()) {
+			iterator.remove();
+		}
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testIterable() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Iterable<Map.Entry<Long, Long>> iterable = mapState.entries();
+		Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator();
+		while (iterator.hasNext()) {
+			assertEquals(5L, (long) iterator.next().getValue());
+			iterator.remove();
+		}
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testKeys() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Iterator<Long> iterator = mapState.keys().iterator();
+		while (iterator.hasNext()) {
+			iterator.remove();
+		}
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testValues() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Iterator<Long> iterator = mapState.values().iterator();
+		while (iterator.hasNext()) {
+			iterator.remove();
+		}
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		mapState.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
new file mode 100644
index 0000000..9694f55
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableReducingState}.
+ */
+public class ImmutableReducingStateTest {
+
+	private final ReducingStateDescriptor<Long> reducingStateDesc =
+			new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
+
+	private ImmutableReducingState<Long> reduceState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!reducingStateDesc.isSerializerInitialized()) {
+			reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		reduceState = ImmutableReducingState.createState(
+				reducingStateDesc,
+				ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+		);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		long value = reduceState.get();
+		assertEquals(42L, value);
+
+		reduceState.add(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		long value = reduceState.get();
+		assertEquals(42L, value);
+
+		reduceState.clear();
+	}
+
+	/**
+	 * Test {@link ReduceFunction} summing up its two arguments.
+	 */
+	private static class SumReduce implements ReduceFunction<Long> {
+
+		private static final long serialVersionUID = 6041237513913189144L;
+
+		@Override
+		public Long reduce(Long value1, Long value2) throws Exception {
+			return value1 + value2;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
new file mode 100644
index 0000000..a0da43d
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableValueState}.
+ */
+public class ImmutableValueStateTest {
+
+	private final ValueStateDescriptor<Long> valueStateDesc =
+			new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
+
+	private ImmutableValueState<Long> valueState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!valueStateDesc.isSerializerInitialized()) {
+			valueStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		valueState = ImmutableValueState.createState(
+				valueStateDesc,
+				ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+		);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		long value = valueState.value();
+		assertEquals(42L, value);
+
+		valueState.update(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		long value = valueState.value();
+		assertEquals(42L, value);
+
+		valueState.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..10792cd
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/pom.xml b/flink-queryable-state/flink-queryable-state-java/pom.xml
deleted file mode 100644
index e60c6f3..0000000
--- a/flink-queryable-state/flink-queryable-state-java/pom.xml
+++ /dev/null
@@ -1,137 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-queryable-state</artifactId>
-		<version>1.4-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-queryable-state-java_${scala.binary.version}</artifactId>
-	<name>flink-queryable-state-java</name>
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<!-- core dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<!-- test dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-	   <dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<!-- ===================================================
-								Testing
-			=================================================== -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils-junit</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-test</artifactId>
-			<version>${curator.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
deleted file mode 100644
index fa2604b..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Exception to fail Future if the Task Manager on which the
- * {@link org.apache.flink.runtime.query.KvStateClientProxy}
- * is running on, does not know the active Job Manager.
- */
-@Internal
-public class UnknownJobManagerException extends Exception {
-
-	private static final long serialVersionUID = 9092442511708951209L;
-
-	public UnknownJobManagerException() {
-		super("Unknown JobManager. Either the JobManager has not registered yet or has lost leadership.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
deleted file mode 100644
index c497a72..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-
-/**
- * Thrown if the KvState does not hold any state for the given key or namespace.
- */
-@Internal
-public class UnknownKeyOrNamespaceException extends BadRequestException {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Creates the exception.
-	 * @param serverName the name of the server that threw the exception.
-	 */
-	public UnknownKeyOrNamespaceException(String serverName) {
-		super(serverName, "No state for the specified key/namespace.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
deleted file mode 100644
index 59ba081..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Thrown if no KvState with the given ID cannot found by the server handler.
- */
-@Internal
-public class UnknownKvStateIdException extends BadRequestException {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Creates the exception.
-	 * @param serverName the name of the server that threw the exception.
-	 * @param kvStateId the state id for which no state was found.
-	 */
-	public UnknownKvStateIdException(String serverName, KvStateID kvStateId) {
-		super(serverName, "No registered state with ID " + Preconditions.checkNotNull(kvStateId) + '.');
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
deleted file mode 100644
index 0d6588a..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-import org.apache.flink.runtime.query.KvStateLocation;
-
-/**
- * Exception thrown if there is no location information available for the given
- * key group in a {@link KvStateLocation} instance.
- */
-@Internal
-public class UnknownKvStateKeyGroupLocationException extends BadRequestException {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Creates the exception.
-	 * @param serverName the name of the server that threw the exception.
-	 */
-	public UnknownKvStateKeyGroupLocationException(String serverName) {
-		super(serverName, "Unknown key-group location.");
-	}
-}