You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/26 17:01:59 UTC
[10/13] flink git commit: [FLINK-7908][QS] Restructure the queryable
state module.
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
new file mode 100644
index 0000000..fc9b1d4
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * The handler used by a {@link Client} to handling incoming messages.
+ *
+ * @param <REQ> the type of request the client will send.
+ * @param <RESP> the type of response the client expects to receive.
+ */
+@Internal
+public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
+
+ private final String clientName;
+
+ private final MessageSerializer<REQ, RESP> serializer;
+
+ private final ClientHandlerCallback<RESP> callback;
+
+ /**
+ * Creates a handler with the callback.
+ *
+ * @param clientName the name of the client.
+ * @param serializer the serializer used to (de-)serialize messages.
+ * @param callback Callback for responses.
+ */
+ public ClientHandler(
+ final String clientName,
+ final MessageSerializer<REQ, RESP> serializer,
+ final ClientHandlerCallback<RESP> callback) {
+
+ this.clientName = Preconditions.checkNotNull(clientName);
+ this.serializer = Preconditions.checkNotNull(serializer);
+ this.callback = Preconditions.checkNotNull(callback);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ try {
+ ByteBuf buf = (ByteBuf) msg;
+ MessageType msgType = MessageSerializer.deserializeHeader(buf);
+
+ if (msgType == MessageType.REQUEST_RESULT) {
+ long requestId = MessageSerializer.getRequestId(buf);
+ RESP result = serializer.deserializeResponse(buf);
+ callback.onRequestResult(requestId, result);
+ } else if (msgType == MessageType.REQUEST_FAILURE) {
+ RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf);
+ callback.onRequestFailure(failure.getRequestId(), failure.getCause());
+ } else if (msgType == MessageType.SERVER_FAILURE) {
+ throw MessageSerializer.deserializeServerFailure(buf);
+ } else {
+ throw new IllegalStateException("Unexpected response type '" + msgType + "'");
+ }
+ } catch (Throwable t1) {
+ try {
+ callback.onFailure(t1);
+ } catch (Throwable t2) {
+ LOG.error("Failed to notify callback about failure", t2);
+ }
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ try {
+ callback.onFailure(cause);
+ } catch (Throwable t) {
+ LOG.error("Failed to notify callback about failure", t);
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // Only the client is expected to close the channel. Otherwise it
+ // indicates a failure. Note that this will be invoked in both cases
+ // though. If the callback closed the channel, the callback must be
+ // ignored.
+ try {
+ callback.onFailure(new ClosedChannelException());
+ } catch (Throwable t) {
+ LOG.error("Failed to notify callback about failure", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
new file mode 100644
index 0000000..00ce1ed
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+
+/**
+ * Callback for {@link ClientHandler}.
+ */
+@Internal
+public interface ClientHandlerCallback<RESP extends MessageBody> {
+
+ /**
+ * Called on a successful request.
+ *
+ * @param requestId ID of the request
+ * @param response The received response
+ */
+ void onRequestResult(long requestId, RESP response);
+
+ /**
+ * Called on a failed request.
+ *
+ * @param requestId ID of the request
+ * @param cause Cause of the request failure
+ */
+ void onRequestFailure(long requestId, Throwable cause);
+
+ /**
+ * Called on any failure, which is not related to a specific request.
+ *
+ * <p>This can be for example a caught Exception in the channel pipeline
+ * or an unexpected channel close.
+ *
+ * @param cause Cause of the failure
+ */
+ void onFailure(Throwable cause);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
new file mode 100644
index 0000000..5e014b8
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Wrapper around Netty's {@link PooledByteBufAllocator} with strict control
+ * over the number of created arenas.
+ */
+public class NettyBufferPool implements ByteBufAllocator {
+
+ /** The wrapped buffer allocator. */
+ private final PooledByteBufAllocator alloc;
+
+ /**
+ * Creates Netty's buffer pool with the specified number of direct arenas.
+ *
+ * @param numberOfArenas Number of arenas (recommended: 2 * number of task
+ * slots)
+ */
+ public NettyBufferPool(int numberOfArenas) {
+ checkArgument(numberOfArenas >= 1, "Number of arenas");
+
+ // We strictly prefer direct buffers and disallow heap allocations.
+ boolean preferDirect = true;
+
+ // Arenas allocate chunks of pageSize << maxOrder bytes. With these
+ // defaults, this results in chunks of 16 MB.
+ int pageSize = 8192;
+ int maxOrder = 11;
+
+ // Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e.
+ // we allocate numDirectArenas * 16 MB of direct memory. This can grow
+ // to multiple chunks per arena during runtime, but this should only
+ // happen with a large amount of connections per task manager. We
+ // control the memory allocations with low/high watermarks when writing
+ // to the TCP channels. Chunks are allocated lazily.
+ int numDirectArenas = numberOfArenas;
+
+ // No heap arenas, please.
+ int numHeapArenas = 0;
+
+ this.alloc = new PooledByteBufAllocator(
+ preferDirect,
+ numHeapArenas,
+ numDirectArenas,
+ pageSize,
+ maxOrder);
+ }
+
+ // ------------------------------------------------------------------------
+ // Delegate calls to the allocated and prohibit heap buffer allocations
+ // ------------------------------------------------------------------------
+
+ @Override
+ public ByteBuf buffer() {
+ return alloc.buffer();
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity) {
+ return alloc.buffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+ return alloc.buffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf ioBuffer() {
+ return alloc.ioBuffer();
+ }
+
+ @Override
+ public ByteBuf ioBuffer(int initialCapacity) {
+ return alloc.ioBuffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+ return alloc.ioBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf heapBuffer() {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity) {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public ByteBuf directBuffer() {
+ return alloc.directBuffer();
+ }
+
+ @Override
+ public ByteBuf directBuffer(int initialCapacity) {
+ return alloc.directBuffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+ return alloc.directBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public CompositeByteBuf compositeBuffer() {
+ return alloc.compositeBuffer();
+ }
+
+ @Override
+ public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+ return alloc.compositeBuffer(maxNumComponents);
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer() {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer() {
+ return alloc.compositeDirectBuffer();
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+ return alloc.compositeDirectBuffer(maxNumComponents);
+ }
+
+ @Override
+ public boolean isDirectBufferPooled() {
+ return alloc.isDirectBufferPooled();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
new file mode 100644
index 0000000..f26c267
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The base class for every message exchanged during the communication between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ *
+ * <p>Every such message should also have a {@link MessageDeserializer}.
+ */
+@Internal
+public abstract class MessageBody {
+
+ /**
+ * Serializes the message into a byte array.
+ * @return A byte array with the serialized content of the message.
+ */
+ public abstract byte[] serialize();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
new file mode 100644
index 0000000..436fb82
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/**
+ * A utility used to deserialize a {@link MessageBody message}.
+ * @param <M> The type of the message to be deserialized.
+ * It has to extend {@link MessageBody}
+ */
+@Internal
+public interface MessageDeserializer<M extends MessageBody> {
+
+ /**
+ * Deserializes a message contained in a byte buffer.
+ * @param buf the buffer containing the message.
+ * @return The deserialized message.
+ */
+ M deserializeMessage(ByteBuf buf);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
new file mode 100644
index 0000000..c0a0d32
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+
+/**
+ * Serialization and deserialization of messages exchanged between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ *
+ * <p>The binary messages have the following format:
+ *
+ * <pre>
+ * <------ Frame ------------------------->
+ * +----------------------------------------+
+ * | HEADER (8) | PAYLOAD (VAR) |
+ * +------------------+----------------------------------------+
+ * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
+ * +------------------+----------------------------------------+
+ * </pre>
+ *
+ * <p>The concrete content of a message depends on the {@link MessageType}.
+ *
+ * @param <REQ> Type of the requests of the protocol.
+ * @param <RESP> Type of the responses of the protocol.
+ */
+@Internal
+public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> {
+
+ /** The serialization version ID. */
+ private static final int VERSION = 0x79a1b710;
+
+ /** Byte length of the header. */
+ private static final int HEADER_LENGTH = 2 * Integer.BYTES;
+
+ /** Byte length of the request id. */
+ private static final int REQUEST_ID_SIZE = Long.BYTES;
+
+ /** The constructor of the {@link MessageBody client requests}. Used for deserialization. */
+ private final MessageDeserializer<REQ> requestDeserializer;
+
+ /** The constructor of the {@link MessageBody server responses}. Used for deserialization. */
+ private final MessageDeserializer<RESP> responseDeserializer;
+
+ public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) {
+ requestDeserializer = Preconditions.checkNotNull(requestDeser);
+ responseDeserializer = Preconditions.checkNotNull(responseDeser);
+ }
+
+ // ------------------------------------------------------------------------
+ // Serialization
+ // ------------------------------------------------------------------------
+
+ /**
+ * Serializes the request sent to the
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
+ *
+ * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+ * @param requestId The id of the request to which the message refers to.
+ * @param request The request to be serialized.
+ * @return A {@link ByteBuf} containing the serialized message.
+ */
+ public static <REQ extends MessageBody> ByteBuf serializeRequest(
+ final ByteBufAllocator alloc,
+ final long requestId,
+ final REQ request) {
+ Preconditions.checkNotNull(request);
+ return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize());
+ }
+
+ /**
+ * Serializes the response sent to the
+ * {@link org.apache.flink.queryablestate.network.Client}.
+ *
+ * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+ * @param requestId The id of the request to which the message refers to.
+ * @param response The response to be serialized.
+ * @return A {@link ByteBuf} containing the serialized message.
+ */
+ public static <RESP extends MessageBody> ByteBuf serializeResponse(
+ final ByteBufAllocator alloc,
+ final long requestId,
+ final RESP response) {
+ Preconditions.checkNotNull(response);
+ return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize());
+ }
+
+ /**
+ * Serializes the exception containing the failure message sent to the
+ * {@link org.apache.flink.queryablestate.network.Client} in case of
+ * protocol related errors.
+ *
+ * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+ * @param requestId The id of the request to which the message refers to.
+ * @param cause The exception thrown at the server.
+ * @return A {@link ByteBuf} containing the serialized message.
+ */
+ public static ByteBuf serializeRequestFailure(
+ final ByteBufAllocator alloc,
+ final long requestId,
+ final Throwable cause) throws IOException {
+
+ final ByteBuf buf = alloc.ioBuffer();
+
+ // Frame length is set at the end
+ buf.writeInt(0);
+ writeHeader(buf, MessageType.REQUEST_FAILURE);
+ buf.writeLong(requestId);
+
+ try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+ ObjectOutput out = new ObjectOutputStream(bbos)) {
+ out.writeObject(cause);
+ }
+
+ // Set frame length
+ int frameLength = buf.readableBytes() - Integer.BYTES;
+ buf.setInt(0, frameLength);
+ return buf;
+ }
+
+ /**
+ * Serializes the failure message sent to the
+ * {@link org.apache.flink.queryablestate.network.Client} in case of
+ * server related errors.
+ *
+ * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+ * @param cause The exception thrown at the server.
+ * @return The failure message.
+ */
+ public static ByteBuf serializeServerFailure(
+ final ByteBufAllocator alloc,
+ final Throwable cause) throws IOException {
+
+ final ByteBuf buf = alloc.ioBuffer();
+
+ // Frame length is set at end
+ buf.writeInt(0);
+ writeHeader(buf, MessageType.SERVER_FAILURE);
+
+ try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+ ObjectOutput out = new ObjectOutputStream(bbos)) {
+ out.writeObject(cause);
+ }
+
+ // Set frame length
+ int frameLength = buf.readableBytes() - Integer.BYTES;
+ buf.setInt(0, frameLength);
+ return buf;
+ }
+
+ /**
+ * Helper for serializing the header.
+ *
+ * @param buf The {@link ByteBuf} to serialize the header into.
+ * @param messageType The {@link MessageType} of the message this header refers to.
+ */
+ private static void writeHeader(final ByteBuf buf, final MessageType messageType) {
+ buf.writeInt(VERSION);
+ buf.writeInt(messageType.ordinal());
+ }
+
+ /**
+ * Helper for serializing the messages.
+ *
+ * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
+ * @param requestId The id of the request to which the message refers to.
+ * @param messageType The {@link MessageType type of the message}.
+ * @param payload The serialized version of the message.
+ * @return A {@link ByteBuf} containing the serialized message.
+ */
+ private static ByteBuf writePayload(
+ final ByteBufAllocator alloc,
+ final long requestId,
+ final MessageType messageType,
+ final byte[] payload) {
+
+ final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length;
+ final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES);
+
+ buf.writeInt(frameLength);
+ writeHeader(buf, messageType);
+ buf.writeLong(requestId);
+ buf.writeBytes(payload);
+ return buf;
+ }
+
+ // ------------------------------------------------------------------------
+ // Deserialization
+ // ------------------------------------------------------------------------
+
+ /**
+ * De-serializes the header and returns the {@link MessageType}.
+ * <pre>
+ * <b>The buffer is expected to be at the header position.</b>
+ * </pre>
+ * @param buf The {@link ByteBuf} containing the serialized header.
+ * @return The message type.
+ * @throws IllegalStateException If unexpected message version or message type.
+ */
+ public static MessageType deserializeHeader(final ByteBuf buf) {
+
+ // checking the version
+ int version = buf.readInt();
+ Preconditions.checkState(version == VERSION,
+ "Version Mismatch: Found " + version + ", Expected: " + VERSION + '.');
+
+ // fetching the message type
+ int msgType = buf.readInt();
+ MessageType[] values = MessageType.values();
+ Preconditions.checkState(msgType >= 0 && msgType < values.length,
+ "Illegal message type with index " + msgType + '.');
+ return values[msgType];
+ }
+
+ /**
+ * De-serializes the header and returns the {@link MessageType}.
+ * <pre>
+ * <b>The buffer is expected to be at the request id position.</b>
+ * </pre>
+ * @param buf The {@link ByteBuf} containing the serialized request id.
+ * @return The request id.
+ */
+ public static long getRequestId(final ByteBuf buf) {
+ return buf.readLong();
+ }
+
+ /**
+ * De-serializes the request sent to the
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
+ * <pre>
+ * <b>The buffer is expected to be at the request position.</b>
+ * </pre>
+ * @param buf The {@link ByteBuf} containing the serialized request.
+ * @return The request.
+ */
+ public REQ deserializeRequest(final ByteBuf buf) {
+ Preconditions.checkNotNull(buf);
+ return requestDeserializer.deserializeMessage(buf);
+ }
+
+ /**
+ * De-serializes the response sent to the
+ * {@link org.apache.flink.queryablestate.network.Client}.
+ * <pre>
+ * <b>The buffer is expected to be at the response position.</b>
+ * </pre>
+ * @param buf The {@link ByteBuf} containing the serialized response.
+ * @return The response.
+ */
+ public RESP deserializeResponse(final ByteBuf buf) {
+ Preconditions.checkNotNull(buf);
+ return responseDeserializer.deserializeMessage(buf);
+ }
+
+ /**
+ * De-serializes the {@link RequestFailure} sent to the
+ * {@link org.apache.flink.queryablestate.network.Client} in case of
+ * protocol related errors.
+ * <pre>
+ * <b>The buffer is expected to be at the correct position.</b>
+ * </pre>
+ * @param buf The {@link ByteBuf} containing the serialized failure message.
+ * @return The failure message.
+ */
+ public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
+ long requestId = buf.readLong();
+
+ Throwable cause;
+ try (ByteBufInputStream bis = new ByteBufInputStream(buf);
+ ObjectInputStream in = new ObjectInputStream(bis)) {
+ cause = (Throwable) in.readObject();
+ }
+ return new RequestFailure(requestId, cause);
+ }
+
+ /**
+ * De-serializes the failure message sent to the
+ * {@link org.apache.flink.queryablestate.network.Client} in case of
+ * server related errors.
+ * <pre>
+ * <b>The buffer is expected to be at the correct position.</b>
+ * </pre>
+ * @param buf The {@link ByteBuf} containing the serialized failure message.
+ * @return The failure message.
+ */
+ public static Throwable deserializeServerFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
+ try (ByteBufInputStream bis = new ByteBufInputStream(buf);
+ ObjectInputStream in = new ObjectInputStream(bis)) {
+ return (Throwable) in.readObject();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
new file mode 100644
index 0000000..562ce93
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Expected message types during the communication between
+ * {@link org.apache.flink.queryablestate.network.Client client} and
+ * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
+ */
+@Internal
+public enum MessageType {
+
+ /** The message is a request. */
+ REQUEST,
+
+ /** The message is a successful response. */
+ REQUEST_RESULT,
+
+ /** The message indicates a protocol-related failure. */
+ REQUEST_FAILURE,
+
+ /** The message indicates a server failure. */
+ SERVER_FAILURE
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
new file mode 100644
index 0000000..106199f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.messages;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A message indicating a protocol-related error.
+ */
+@Internal
+public class RequestFailure {
+
+ /** ID of the request responding to. */
+ private final long requestId;
+
+ /** Failure cause. Not allowed to be a user type. */
+ private final Throwable cause;
+
+ /**
+ * Creates a failure response to a {@link MessageBody}.
+ *
+ * @param requestId ID for the request responding to
+ * @param cause Failure cause (not allowed to be a user type)
+ */
+ public RequestFailure(long requestId, Throwable cause) {
+ this.requestId = requestId;
+ this.cause = cause;
+ }
+
+ /**
+ * Returns the request ID responding to.
+ *
+ * @return Request ID responding to
+ */
+ public long getRequestId() {
+ return requestId;
+ }
+
+ /**
+ * Returns the failure cause.
+ *
+ * @return Failure cause
+ */
+ public Throwable getCause() {
+ return cause;
+ }
+
+ @Override
+ public String toString() {
+ return "RequestFailure{" +
+ "requestId=" + requestId +
+ ", cause=" + cause +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
new file mode 100644
index 0000000..9ba5f84
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Atomic {@link KvStateRequestStats} implementation.
+ */
+public class AtomicKvStateRequestStats implements KvStateRequestStats {
+
+ /**
+ * Number of active connections.
+ */
+ private final AtomicLong numConnections = new AtomicLong();
+
+ /**
+ * Total number of reported requests.
+ */
+ private final AtomicLong numRequests = new AtomicLong();
+
+ /**
+ * Total number of successful requests (<= reported requests).
+ */
+ private final AtomicLong numSuccessful = new AtomicLong();
+
+ /**
+ * Total duration of all successful requests.
+ */
+ private final AtomicLong successfulDuration = new AtomicLong();
+
+ /**
+ * Total number of failed requests (<= reported requests).
+ */
+ private final AtomicLong numFailed = new AtomicLong();
+
+ @Override
+ public void reportActiveConnection() {
+ numConnections.incrementAndGet();
+ }
+
+ @Override
+ public void reportInactiveConnection() {
+ numConnections.decrementAndGet();
+ }
+
+ @Override
+ public void reportRequest() {
+ numRequests.incrementAndGet();
+ }
+
+ @Override
+ public void reportSuccessfulRequest(long durationTotalMillis) {
+ numSuccessful.incrementAndGet();
+ successfulDuration.addAndGet(durationTotalMillis);
+ }
+
+ @Override
+ public void reportFailedRequest() {
+ numFailed.incrementAndGet();
+ }
+
+ public long getNumConnections() {
+ return numConnections.get();
+ }
+
+ public long getNumRequests() {
+ return numRequests.get();
+ }
+
+ public long getNumSuccessful() {
+ return numSuccessful.get();
+ }
+
+ public long getNumFailed() {
+ return numFailed.get();
+ }
+
+ @Override
+ public String toString() {
+ return "AtomicKvStateRequestStats{" +
+ "numConnections=" + numConnections +
+ ", numRequests=" + numRequests +
+ ", numSuccessful=" + numSuccessful +
+ ", numFailed=" + numFailed +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
new file mode 100644
index 0000000..b34ac3e
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+/**
+ * Disabled {@link KvStateRequestStats} implementation.
+ */
+public class DisabledKvStateRequestStats implements KvStateRequestStats {
+
+ @Override
+ public void reportActiveConnection() {
+ }
+
+ @Override
+ public void reportInactiveConnection() {
+ }
+
+ @Override
+ public void reportRequest() {
+ }
+
+ @Override
+ public void reportSuccessfulRequest(long durationTotalMillis) {
+ }
+
+ @Override
+ public void reportFailedRequest() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
new file mode 100644
index 0000000..8e9edd8
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network.stats;
+
+/**
+ * Simple statistics for monitoring the state server
+ * and the client proxy.
+ */
+public interface KvStateRequestStats {
+
+ /**
+ * Reports an active connection.
+ */
+ void reportActiveConnection();
+
+ /**
+ * Reports an inactive connection.
+ */
+ void reportInactiveConnection();
+
+ /**
+ * Reports an incoming request.
+ */
+ void reportRequest();
+
+ /**
+ * Reports a successfully handled request.
+ *
+ * @param durationTotalMillis Duration of the request (in milliseconds).
+ */
+ void reportSuccessfulRequest(long durationTotalMillis);
+
+ /**
+ * Reports a failure during a request.
+ */
+ void reportFailedRequest();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
new file mode 100644
index 0000000..ca11a32
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/**
+ * Test for {@link VoidNamespaceTypeInfo}.
+ */
+public class VoidNamespaceTypeInfoTest extends TypeInformationTestBase<VoidNamespaceTypeInfo> {
+
+ @Override
+ protected VoidNamespaceTypeInfo[] getTestData() {
+ return new VoidNamespaceTypeInfo[] { VoidNamespaceTypeInfo.INSTANCE };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
new file mode 100644
index 0000000..ebbc896
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableAggregatingStateTest}.
+ */
+public class ImmutableAggregatingStateTest {
+
+ private final AggregatingStateDescriptor<Long, String, String> aggrStateDesc =
+ new AggregatingStateDescriptor<>(
+ "test",
+ new SumAggr(),
+ String.class);
+
+ private ImmutableAggregatingState<Long, String> aggrState;
+
+ @Before
+ public void setUp() throws Exception {
+ if (!aggrStateDesc.isSerializerInitialized()) {
+ aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+ }
+
+ final String initValue = "42";
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out));
+
+ aggrState = ImmutableAggregatingState.createState(
+ aggrStateDesc,
+ out.toByteArray()
+ );
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testUpdate() {
+ String value = aggrState.get();
+ assertEquals("42", value);
+
+ aggrState.add(54L);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testClear() {
+ String value = aggrState.get();
+ assertEquals("42", value);
+
+ aggrState.clear();
+ }
+
+ /**
+ * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
+ */
+ private static class SumAggr implements AggregateFunction<Long, String, String> {
+
+ private static final long serialVersionUID = -6249227626701264599L;
+
+ @Override
+ public String createAccumulator() {
+ return "";
+ }
+
+ @Override
+ public String add(Long value, String accumulator) {
+ accumulator += ", " + value;
+ return accumulator;
+ }
+
+ @Override
+ public String getResult(String accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public String merge(String a, String b) {
+ return a + ", " + b;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
new file mode 100644
index 0000000..9e8dfc9
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableFoldingState}.
+ */
+public class ImmutableFoldingStateTest {
+
+ private final FoldingStateDescriptor<Long, String> foldingStateDesc =
+ new FoldingStateDescriptor<>(
+ "test",
+ "0",
+ new SumFold(),
+ StringSerializer.INSTANCE);
+
+ private ImmutableFoldingState<Long, String> foldingState;
+
+ @Before
+ public void setUp() throws Exception {
+ if (!foldingStateDesc.isSerializerInitialized()) {
+ foldingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ StringSerializer.INSTANCE.serialize("42", new DataOutputViewStreamWrapper(out));
+
+ foldingState = ImmutableFoldingState.createState(
+ foldingStateDesc,
+ out.toByteArray()
+ );
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testUpdate() {
+ String value = foldingState.get();
+ assertEquals("42", value);
+
+ foldingState.add(54L);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testClear() {
+ String value = foldingState.get();
+ assertEquals("42", value);
+
+ foldingState.clear();
+ }
+
+ /**
+ * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
+ */
+ private static class SumFold implements FoldFunction<Long, String> {
+
+ private static final long serialVersionUID = -6249227626701264599L;
+
+ @Override
+ public String fold(String accumulator, Long value) throws Exception {
+ long acc = Long.valueOf(accumulator);
+ acc += value;
+ return Long.toString(acc);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
new file mode 100644
index 0000000..a78ed1f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableListState}.
+ */
+public class ImmutableListStateTest {
+
+ private final ListStateDescriptor<Long> listStateDesc =
+ new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
+
+ private ImmutableListState<Long> listState;
+
+ @Before
+ public void setUp() throws Exception {
+ if (!listStateDesc.isSerializerInitialized()) {
+ listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+ }
+
+ List<Long> init = new ArrayList<>();
+ init.add(42L);
+
+ byte[] serInit = serializeInitValue(init);
+ listState = ImmutableListState.createState(listStateDesc, serInit);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testUpdate() {
+ List<Long> list = getStateContents();
+ assertEquals(1L, list.size());
+
+ long element = list.get(0);
+ assertEquals(42L, element);
+
+ listState.add(54L);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testClear() {
+ List<Long> list = getStateContents();
+ assertEquals(1L, list.size());
+
+ long element = list.get(0);
+ assertEquals(42L, element);
+
+ listState.clear();
+ }
+
+ /**
+ * Copied from HeapListState.getSerializedValue(Object, Object).
+ */
+ private byte[] serializeInitValue(List<Long> toSerialize) throws IOException {
+ TypeSerializer<Long> serializer = listStateDesc.getElementSerializer();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
+
+ // write the same as RocksDB writes lists, with one ',' separator
+ for (int i = 0; i < toSerialize.size(); i++) {
+ serializer.serialize(toSerialize.get(i), view);
+ if (i < toSerialize.size() - 1) {
+ view.writeByte(',');
+ }
+ }
+ view.flush();
+
+ return baos.toByteArray();
+ }
+
+ private List<Long> getStateContents() {
+ List<Long> list = new ArrayList<>();
+ for (Long elem: listState.get()) {
+ list.add(elem);
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
new file mode 100644
index 0000000..ffeabae
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link ImmutableMapState}.
+ */
+public class ImmutableMapStateTest {
+
+ private final MapStateDescriptor<Long, Long> mapStateDesc =
+ new MapStateDescriptor<>(
+ "test",
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO);
+
+ private ImmutableMapState<Long, Long> mapState;
+
+ @Before
+ public void setUp() throws Exception {
+ if (!mapStateDesc.isSerializerInitialized()) {
+ mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+ }
+
+ Map<Long, Long> initMap = new HashMap<>();
+ initMap.put(1L, 5L);
+ initMap.put(2L, 5L);
+
+ byte[] initSer = KvStateSerializer.serializeMap(
+ initMap.entrySet(),
+ BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+ mapState = ImmutableMapState.createState(mapStateDesc, initSer);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testPut() {
+ assertTrue(mapState.contains(1L));
+ long value = mapState.get(1L);
+ assertEquals(5L, value);
+
+ assertTrue(mapState.contains(2L));
+ value = mapState.get(2L);
+ assertEquals(5L, value);
+
+ mapState.put(2L, 54L);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testPutAll() {
+ assertTrue(mapState.contains(1L));
+ long value = mapState.get(1L);
+ assertEquals(5L, value);
+
+ assertTrue(mapState.contains(2L));
+ value = mapState.get(2L);
+ assertEquals(5L, value);
+
+ Map<Long, Long> nMap = new HashMap<>();
+ nMap.put(1L, 7L);
+ nMap.put(2L, 7L);
+
+ mapState.putAll(nMap);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testUpdate() {
+ assertTrue(mapState.contains(1L));
+ long value = mapState.get(1L);
+ assertEquals(5L, value);
+
+ assertTrue(mapState.contains(2L));
+ value = mapState.get(2L);
+ assertEquals(5L, value);
+
+ mapState.put(2L, 54L);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testIterator() {
+ assertTrue(mapState.contains(1L));
+ long value = mapState.get(1L);
+ assertEquals(5L, value);
+
+ assertTrue(mapState.contains(2L));
+ value = mapState.get(2L);
+ assertEquals(5L, value);
+
+ Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator();
+ while (iterator.hasNext()) {
+ iterator.remove();
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testIterable() {
+ assertTrue(mapState.contains(1L));
+ long value = mapState.get(1L);
+ assertEquals(5L, value);
+
+ assertTrue(mapState.contains(2L));
+ value = mapState.get(2L);
+ assertEquals(5L, value);
+
+ Iterable<Map.Entry<Long, Long>> iterable = mapState.entries();
+ Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator();
+ while (iterator.hasNext()) {
+ assertEquals(5L, (long) iterator.next().getValue());
+ iterator.remove();
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testKeys() {
+ assertTrue(mapState.contains(1L));
+ long value = mapState.get(1L);
+ assertEquals(5L, value);
+
+ assertTrue(mapState.contains(2L));
+ value = mapState.get(2L);
+ assertEquals(5L, value);
+
+ Iterator<Long> iterator = mapState.keys().iterator();
+ while (iterator.hasNext()) {
+ iterator.remove();
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testValues() {
+ assertTrue(mapState.contains(1L));
+ long value = mapState.get(1L);
+ assertEquals(5L, value);
+
+ assertTrue(mapState.contains(2L));
+ value = mapState.get(2L);
+ assertEquals(5L, value);
+
+ Iterator<Long> iterator = mapState.values().iterator();
+ while (iterator.hasNext()) {
+ iterator.remove();
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testClear() {
+ assertTrue(mapState.contains(1L));
+ long value = mapState.get(1L);
+ assertEquals(5L, value);
+
+ assertTrue(mapState.contains(2L));
+ value = mapState.get(2L);
+ assertEquals(5L, value);
+
+ mapState.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
new file mode 100644
index 0000000..9694f55
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableReducingState}.
+ */
+public class ImmutableReducingStateTest {
+
+ private final ReducingStateDescriptor<Long> reducingStateDesc =
+ new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
+
+ private ImmutableReducingState<Long> reduceState;
+
+ @Before
+ public void setUp() throws Exception {
+ if (!reducingStateDesc.isSerializerInitialized()) {
+ reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+ }
+
+ reduceState = ImmutableReducingState.createState(
+ reducingStateDesc,
+ ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+ );
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testUpdate() {
+ long value = reduceState.get();
+ assertEquals(42L, value);
+
+ reduceState.add(54L);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testClear() {
+ long value = reduceState.get();
+ assertEquals(42L, value);
+
+ reduceState.clear();
+ }
+
+ /**
+ * Test {@link ReduceFunction} summing up its two arguments.
+ */
+ private static class SumReduce implements ReduceFunction<Long> {
+
+ private static final long serialVersionUID = 6041237513913189144L;
+
+ @Override
+ public Long reduce(Long value1, Long value2) throws Exception {
+ return value1 + value2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
new file mode 100644
index 0000000..a0da43d
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableValueState}.
+ */
+public class ImmutableValueStateTest {
+
+ private final ValueStateDescriptor<Long> valueStateDesc =
+ new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
+
+ private ImmutableValueState<Long> valueState;
+
+ @Before
+ public void setUp() throws Exception {
+ if (!valueStateDesc.isSerializerInitialized()) {
+ valueStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+ }
+
+ valueState = ImmutableValueState.createState(
+ valueStateDesc,
+ ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+ );
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testUpdate() {
+ long value = valueState.value();
+ assertEquals(42L, value);
+
+ valueState.update(54L);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testClear() {
+ long value = valueState.value();
+ assertEquals(42L, value);
+
+ valueState.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..10792cd
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/pom.xml b/flink-queryable-state/flink-queryable-state-java/pom.xml
deleted file mode 100644
index e60c6f3..0000000
--- a/flink-queryable-state/flink-queryable-state-java/pom.xml
+++ /dev/null
@@ -1,137 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-queryable-state</artifactId>
- <version>1.4-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-queryable-state-java_${scala.binary.version}</artifactId>
- <name>flink-queryable-state-java</name>
- <packaging>jar</packaging>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- ===================================================
- Testing
- =================================================== -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils-junit</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-test</artifactId>
- <version>${curator.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
deleted file mode 100644
index fa2604b..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Exception to fail Future if the Task Manager on which the
- * {@link org.apache.flink.runtime.query.KvStateClientProxy}
- * is running on, does not know the active Job Manager.
- */
-@Internal
-public class UnknownJobManagerException extends Exception {
-
- private static final long serialVersionUID = 9092442511708951209L;
-
- public UnknownJobManagerException() {
- super("Unknown JobManager. Either the JobManager has not registered yet or has lost leadership.");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
deleted file mode 100644
index c497a72..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-
-/**
- * Thrown if the KvState does not hold any state for the given key or namespace.
- */
-@Internal
-public class UnknownKeyOrNamespaceException extends BadRequestException {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates the exception.
- * @param serverName the name of the server that threw the exception.
- */
- public UnknownKeyOrNamespaceException(String serverName) {
- super(serverName, "No state for the specified key/namespace.");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
deleted file mode 100644
index 59ba081..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Thrown if no KvState with the given ID cannot found by the server handler.
- */
-@Internal
-public class UnknownKvStateIdException extends BadRequestException {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates the exception.
- * @param serverName the name of the server that threw the exception.
- * @param kvStateId the state id for which no state was found.
- */
- public UnknownKvStateIdException(String serverName, KvStateID kvStateId) {
- super(serverName, "No registered state with ID " + Preconditions.checkNotNull(kvStateId) + '.');
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
deleted file mode 100644
index 0d6588a..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.BadRequestException;
-import org.apache.flink.runtime.query.KvStateLocation;
-
-/**
- * Exception thrown if there is no location information available for the given
- * key group in a {@link KvStateLocation} instance.
- */
-@Internal
-public class UnknownKvStateKeyGroupLocationException extends BadRequestException {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates the exception.
- * @param serverName the name of the server that threw the exception.
- */
- public UnknownKvStateKeyGroupLocationException(String serverName) {
- super(serverName, "Unknown key-group location.");
- }
-}