You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/09 14:47:43 UTC
[09/10] flink git commit: [FLINK-3779] [runtime] Add KvState network
client and server
[FLINK-3779] [runtime] Add KvState network client and server
- Adds a Netty-based server and client to query KvState instances, which have
been published to the KvStateRegistry.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af07eed8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af07eed8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af07eed8
Branch: refs/heads/master
Commit: af07eed8e9bed76e54aab04fccc9c424b1e02fa0
Parents: 63c9b8e
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon May 30 14:00:49 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Aug 9 16:42:05 2016 +0200
----------------------------------------------------------------------
.../io/network/netty/NettyBufferPool.java | 2 +-
.../flink/runtime/jobgraph/JobVertexID.java | 11 +-
.../apache/flink/runtime/query/KvStateID.java | 41 ++
.../runtime/query/KvStateServerAddress.java | 87 +++
.../query/netty/AtomicKvStateRequestStats.java | 94 +++
.../runtime/query/netty/ChunkedByteBuf.java | 97 +++
.../netty/DisabledKvStateRequestStats.java | 45 ++
.../runtime/query/netty/KvStateClient.java | 575 +++++++++++++++
.../query/netty/KvStateClientHandler.java | 104 +++
.../netty/KvStateClientHandlerCallback.java | 54 ++
.../query/netty/KvStateRequestStats.java | 53 ++
.../runtime/query/netty/KvStateServer.java | 243 +++++++
.../query/netty/KvStateServerHandler.java | 301 ++++++++
.../query/netty/UnknownKeyOrNamespace.java | 31 +
.../runtime/query/netty/UnknownKvStateID.java | 35 +
.../query/netty/message/KvStateRequest.java | 89 +++
.../netty/message/KvStateRequestFailure.java | 68 ++
.../netty/message/KvStateRequestResult.java | 74 ++
.../netty/message/KvStateRequestSerializer.java | 518 +++++++++++++
.../query/netty/message/KvStateRequestType.java | 40 ++
.../flink/runtime/query/netty/package-info.java | 80 +++
.../runtime/util/DataInputDeserializer.java | 8 +
.../query/netty/KvStateClientHandlerTest.java | 110 +++
.../runtime/query/netty/KvStateClientTest.java | 718 +++++++++++++++++++
.../query/netty/KvStateServerHandlerTest.java | 622 ++++++++++++++++
.../runtime/query/netty/KvStateServerTest.java | 174 +++++
.../message/KvStateRequestSerializerTest.java | 258 +++++++
27 files changed, 4525 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
index 6d09f26..4a88b34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
@@ -57,7 +57,7 @@ public class NettyBufferPool implements ByteBufAllocator {
* @param numberOfArenas Number of arenas (recommended: 2 * number of task
* slots)
*/
- NettyBufferPool(int numberOfArenas) {
+ public NettyBufferPool(int numberOfArenas) {
checkArgument(numberOfArenas >= 1, "Number of arenas");
this.numberOfArenas = numberOfArenas;
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
index 514aabc..1f78b21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.jobgraph;
-import javax.xml.bind.DatatypeConverter;
-
import org.apache.flink.util.AbstractID;
+import javax.xml.bind.DatatypeConverter;
+
/**
* A class for statistically unique job vertex IDs.
*/
@@ -32,15 +32,14 @@ public class JobVertexID extends AbstractID {
public JobVertexID() {
super();
}
+ public JobVertexID(byte[] bytes) {
+ super(bytes);
+ }
public JobVertexID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}
- public JobVertexID(byte[] bytes) {
- super(bytes);
- }
-
public static JobVertexID fromHexString(String hexString) {
return new JobVertexID(DatatypeConverter.parseHexBinary(hexString));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
new file mode 100644
index 0000000..bb05833
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Identifier for {@link KvState} instances.
+ *
+ * <p>Assigned when registering state at the {@link KvStateRegistry}.
+ */
+public class KvStateID extends AbstractID {
+
+ private static final long serialVersionUID = 1L;
+
+ public KvStateID() {
+ super();
+ }
+
+ public KvStateID(long lowerPart, long upperPart) {
+ super(lowerPart, upperPart);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
new file mode 100644
index 0000000..7887ed1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+/**
+ * The (host, port)-address of a {@link KvStateServer}.
+ */
+public class KvStateServerAddress implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** KvStateServer host address. */
+ private final InetAddress hostAddress;
+
+ /** KvStateServer port. */
+ private final int port;
+
+ /**
+ * Creates a KvStateServerAddress for the given KvStateServer host address
+ * and port.
+ *
+ * @param hostAddress KvStateServer host address
+ * @param port KvStateServer port
+ */
+ public KvStateServerAddress(InetAddress hostAddress, int port) {
+ this.hostAddress = Preconditions.checkNotNull(hostAddress, "Host address");
+ Preconditions.checkArgument(port > 0 && port <= 65535, "Port " + port + " is out of range 1-65535");
+ this.port = port;
+ }
+
+ /**
+ * Returns the host address of the KvStateServer.
+ *
+ * @return KvStateServer host address
+ */
+ public InetAddress getHost() {
+ return hostAddress;
+ }
+
+ /**
+ * Returns the port of the KvStateServer.
+ *
+ * @return KvStateServer port
+ */
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) { return true; }
+ if (o == null || getClass() != o.getClass()) { return false; }
+
+ KvStateServerAddress that = (KvStateServerAddress) o;
+
+ return port == that.port && hostAddress.equals(that.hostAddress);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = hostAddress.hashCode();
+ result = 31 * result + port;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
new file mode 100644
index 0000000..2fca4a8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Atomic {@link KvStateRequestStats} implementation.
+ */
+public class AtomicKvStateRequestStats implements KvStateRequestStats {
+
+ /**
+ * Number of active connections.
+ */
+ private final AtomicLong numConnections = new AtomicLong();
+
+ /**
+ * Total number of reported requests.
+ */
+ private final AtomicLong numRequests = new AtomicLong();
+
+ /**
+ * Total number of successful requests (<= reported requests).
+ */
+ private final AtomicLong numSuccessful = new AtomicLong();
+
+ /**
+ * Total duration of all successful requests.
+ */
+ private final AtomicLong successfulDuration = new AtomicLong();
+
+ /**
+ * Total number of failed requests (<= reported requests).
+ */
+ private final AtomicLong numFailed = new AtomicLong();
+
+ @Override
+ public void reportActiveConnection() {
+ numConnections.incrementAndGet();
+ }
+
+ @Override
+ public void reportInactiveConnection() {
+ numConnections.decrementAndGet();
+ }
+
+ @Override
+ public void reportRequest() {
+ numRequests.incrementAndGet();
+ }
+
+ @Override
+ public void reportSuccessfulRequest(long durationTotalMillis) {
+ numSuccessful.incrementAndGet();
+ successfulDuration.addAndGet(durationTotalMillis);
+ }
+
+ @Override
+ public void reportFailedRequest() {
+ numFailed.incrementAndGet();
+ }
+
+ public long getNumConnections() {
+ return numConnections.get();
+ }
+
+ public long getNumRequests() {
+ return numRequests.get();
+ }
+
+ public long getNumSuccessful() {
+ return numSuccessful.get();
+ }
+
+ public long getNumFailed() {
+ return numFailed.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
new file mode 100644
index 0000000..6d32489
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedInput;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler},
+ * respecting the high and low watermarks.
+ *
+ * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a>
+ */
+class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
+
+ /** The buffer to chunk */
+ private final ByteBuf buf;
+
+ /** Size of chunks */
+ private final int chunkSize;
+
+ /** Closed flag */
+ private boolean isClosed;
+
+ /** End of input flag */
+ private boolean isEndOfInput;
+
+ public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
+ this.buf = Preconditions.checkNotNull(buf, "Buffer");
+ Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size");
+ this.chunkSize = chunkSize;
+ }
+
+ @Override
+ public boolean isEndOfInput() throws Exception {
+ return isClosed || isEndOfInput;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!isClosed) {
+ // If we did not consume the whole buffer yet, we have to release
+ // it here. Otherwise, it's the responsibility of the consumer.
+ if (!isEndOfInput) {
+ buf.release();
+ }
+
+ isClosed = true;
+ }
+ }
+
+ @Override
+ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+ if (isClosed) {
+ return null;
+ } else if (buf.readableBytes() <= chunkSize) {
+ isEndOfInput = true;
+
+ // Don't retain as the consumer is responsible to release it
+ return buf.slice();
+ } else {
+ // Return a chunk sized slice of the buffer. The ref count is
+ // shared with the original buffer. That's why we need to retain
+ // a reference here.
+ return buf.readSlice(chunkSize).retain();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ChunkedByteBuf{" +
+ "buf=" + buf +
+ ", chunkSize=" + chunkSize +
+ ", isClosed=" + isClosed +
+ ", isEndOfInput=" + isEndOfInput +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
new file mode 100644
index 0000000..de8824d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+/**
+ * Disabled {@link KvStateRequestStats} implementation.
+ */
+public class DisabledKvStateRequestStats implements KvStateRequestStats {
+
+ @Override
+ public void reportActiveConnection() {
+ }
+
+ @Override
+ public void reportInactiveConnection() {
+ }
+
+ @Override
+ public void reportRequest() {
+ }
+
+ @Override
+ public void reportSuccessfulRequest(long durationTotalMillis) {
+ }
+
+ @Override
+ public void reportFailedRequest() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
new file mode 100644
index 0000000..6cfe86b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import akka.dispatch.Futures;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Netty-based client querying {@link KvStateServer} instances.
+ *
+ * <p>This client can be used by multiple threads concurrently. Operations are
+ * executed asynchronously and return Futures to their result.
+ *
+ * <p>The incoming pipeline looks as follows:
+ * <pre>
+ * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
+ * </pre>
+ *
+ * <p>Received binary messages are expected to contain a frame length field. Netty's
+ * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before
+ * giving it to our {@link KvStateClientHandler}.
+ *
+ * <p>Connections are established and closed by the client. The server only
+ * closes the connection on a fatal failure that cannot be recovered.
+ */
+public class KvStateClient {
+
+ /** Netty's Bootstrap. */
+ private final Bootstrap bootstrap;
+
+ /** Statistics tracker */
+ private final KvStateRequestStats stats;
+
+ /** Established connections. */
+ private final ConcurrentHashMap<KvStateServerAddress, EstablishedConnection> establishedConnections =
+ new ConcurrentHashMap<>();
+
+ /** Pending connections. */
+ private final ConcurrentHashMap<KvStateServerAddress, PendingConnection> pendingConnections =
+ new ConcurrentHashMap<>();
+
+ /** Atomic shut down flag. */
+ private final AtomicBoolean shutDown = new AtomicBoolean();
+
+ /**
+ * Creates a client with the specified number of event loop threads.
+ *
+ * @param numEventLoopThreads Number of event loop threads (minimum 1).
+ */
+ public KvStateClient(int numEventLoopThreads, KvStateRequestStats stats) {
+ Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
+ NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("Flink KvStateClient Event Loop Thread %d")
+ .build();
+
+ NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
+
+ this.bootstrap = new Bootstrap()
+ .group(nioGroup)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, bufferPool)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline()
+ .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+ // ChunkedWriteHandler respects Channel writability
+ .addLast(new ChunkedWriteHandler());
+ }
+ });
+
+ this.stats = Preconditions.checkNotNull(stats, "Statistics tracker");
+ }
+
+ /**
+ * Returns a future holding the serialized request result.
+ *
+ * <p>If the server does not serve a KvState instance with the given ID,
+ * the Future will be failed with a {@link UnknownKvStateID}.
+ *
+ * <p>If the KvState instance does not hold any data for the given key
+ * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
+ *
+ * <p>All other failures are forwarded to the Future.
+ *
+ * @param serverAddress Address of the server to query
+ * @param kvStateId ID of the KvState instance to query
+ * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance with
+ * @return Future holding the serialized result
+ */
+ public Future<byte[]> getKvState(
+ KvStateServerAddress serverAddress,
+ KvStateID kvStateId,
+ byte[] serializedKeyAndNamespace) {
+
+ if (shutDown.get()) {
+ return Futures.failed(new IllegalStateException("Shut down"));
+ }
+
+ EstablishedConnection connection = establishedConnections.get(serverAddress);
+
+ if (connection != null) {
+ return connection.getKvState(kvStateId, serializedKeyAndNamespace);
+ } else {
+ PendingConnection pendingConnection = pendingConnections.get(serverAddress);
+ if (pendingConnection != null) {
+ // There was a race, use the existing pending connection.
+ return pendingConnection.getKvState(kvStateId, serializedKeyAndNamespace);
+ } else {
+ // We try to connect to the server.
+ PendingConnection pending = new PendingConnection(serverAddress);
+ PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending);
+
+ if (previous == null) {
+ // OK, we are responsible to connect.
+ bootstrap.connect(serverAddress.getHost(), serverAddress.getPort())
+ .addListener(pending);
+
+ return pending.getKvState(kvStateId, serializedKeyAndNamespace);
+ } else {
+ // There was a race, use the existing pending connection.
+ return previous.getKvState(kvStateId, serializedKeyAndNamespace);
+ }
+ }
+ }
+ }
+
+ /**
+ * Shuts down the client and closes all connections.
+ *
+ * <p>After a call to this method, all returned futures will be failed.
+ */
+ public void shutDown() {
+ if (shutDown.compareAndSet(false, true)) {
+ for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
+ if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
+ conn.getValue().close();
+ }
+ }
+
+ for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) {
+ if (pendingConnections.remove(conn.getKey()) != null) {
+ conn.getValue().close();
+ }
+ }
+
+ if (bootstrap != null) {
+ EventLoopGroup group = bootstrap.group();
+ if (group != null) {
+ group.shutdownGracefully();
+ }
+ }
+ }
+ }
+
+ /**
+ * Closes the connection to the given server address if it exists.
+ *
+ * <p>If there is a request to the server a new connection will be established.
+ *
+ * @param serverAddress Target address of the connection to close
+ */
+ public void closeConnection(KvStateServerAddress serverAddress) {
+ PendingConnection pending = pendingConnections.get(serverAddress);
+ if (pending != null) {
+ pending.close();
+ }
+
+ EstablishedConnection established = establishedConnections.remove(serverAddress);
+ if (established != null) {
+ established.close();
+ }
+ }
+
+ /**
+ * A pending connection that is in the process of connecting.
+ */
+ private class PendingConnection implements ChannelFutureListener {
+
+ /** Lock to guard the connect call, channel hand in, etc. */
+ private final Object connectLock = new Object();
+
+ /** Address of the server we are connecting to. */
+ private final KvStateServerAddress serverAddress;
+
+ /** Queue of requests while connecting. */
+ private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>();
+
+ /** The established connection after the connect succeeds. */
+ private EstablishedConnection established;
+
+ /** Closed flag. */
+ private boolean closed;
+
+ /** Failure cause if something goes wrong. */
+ private Throwable failureCause;
+
+ /**
+ * Creates a pending connection to the given server.
+ *
+ * @param serverAddress Address of the server to connect to.
+ */
+ private PendingConnection(KvStateServerAddress serverAddress) {
+ this.serverAddress = serverAddress;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ // Callback from the Bootstrap's connect call.
+ if (future.isSuccess()) {
+ handInChannel(future.channel());
+ } else {
+ close(future.cause());
+ }
+ }
+
+ /**
+ * Returns a future holding the serialized request result.
+ *
+ * <p>If the channel has been established, forward the call to the
+ * established channel, otherwise queue it for when the channel is
+ * handed in.
+ *
+ * @param kvStateId ID of the KvState instance to query
+ * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance
+ * with
+ * @return Future holding the serialized result
+ */
+ public Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
+ synchronized (connectLock) {
+ if (failureCause != null) {
+ return Futures.failed(failureCause);
+ } else if (closed) {
+ return Futures.failed(new ClosedChannelException());
+ } else {
+ if (established != null) {
+ return established.getKvState(kvStateId, serializedKeyAndNamespace);
+ } else {
+ // Queue this and handle when connected
+ PendingRequest pending = new PendingRequest(kvStateId, serializedKeyAndNamespace);
+ queuedRequests.add(pending);
+ return pending.promise.future();
+ }
+ }
+ }
+ }
+
+ /**
+ * Hands in a channel after a successful connection.
+ *
+ * @param channel Channel to hand in
+ */
+ private void handInChannel(Channel channel) {
+ synchronized (connectLock) {
+ if (closed || failureCause != null) {
+ // Close the channel and we are done. Any queued requests
+ // are removed on the close/failure call and after that no
+ // new ones can be enqueued.
+ channel.close();
+ } else {
+ established = new EstablishedConnection(serverAddress, channel);
+
+ PendingRequest pending;
+ while ((pending = queuedRequests.poll()) != null) {
+ Future<byte[]> resultFuture = established.getKvState(
+ pending.kvStateId,
+ pending.serializedKeyAndNamespace);
+
+ pending.promise.completeWith(resultFuture);
+ }
+
+ // Publish the channel for the general public
+ establishedConnections.put(serverAddress, established);
+ pendingConnections.remove(serverAddress);
+
+ // Check shut down for possible race with shut down. We
+ // don't want any lingering connections after shut down,
+ // which can happen if we don't check this here.
+ if (shutDown.get()) {
+ if (establishedConnections.remove(serverAddress, established)) {
+ established.close();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Close the connecting channel with a ClosedChannelException.
+ */
+ private void close() {
+ close(new ClosedChannelException());
+ }
+
+ /**
+ * Close the connecting channel with an Exception (can be
+ * <code>null</code>) or forward to the established channel.
+ */
+ private void close(Throwable cause) {
+ synchronized (connectLock) {
+ if (!closed) {
+ if (failureCause == null) {
+ failureCause = cause;
+ }
+
+ if (established != null) {
+ established.close();
+ } else {
+ PendingRequest pending;
+ while ((pending = queuedRequests.poll()) != null) {
+ pending.promise.tryFailure(cause);
+ }
+ }
+
+ closed = true;
+ }
+ }
+ }
+
+ /**
+ * A pending request queued while the channel is connecting.
+ */
+ private final class PendingRequest {
+
+ private final KvStateID kvStateId;
+ private final byte[] serializedKeyAndNamespace;
+ private final Promise<byte[]> promise;
+
+ private PendingRequest(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
+ this.kvStateId = kvStateId;
+ this.serializedKeyAndNamespace = serializedKeyAndNamespace;
+ this.promise = Futures.promise();
+ }
+ }
+
+ @Override
+ public String toString() {
+ synchronized (connectLock) {
+ return "PendingConnection{" +
+ "serverAddress=" + serverAddress +
+ ", queuedRequests=" + queuedRequests.size() +
+ ", established=" + (established != null) +
+ ", closed=" + closed +
+ '}';
+ }
+ }
+ }
+
+ /**
+ * An established connection that wraps the actual channel instance and is
+ * registered at the {@link KvStateClientHandler} for callbacks.
+ */
+ private class EstablishedConnection implements KvStateClientHandlerCallback {
+
+ /** Address of the server we are connected to. */
+ private final KvStateServerAddress serverAddress;
+
+ /** The actual TCP channel. */
+ private final Channel channel;
+
+ /** Pending requests keyed by request ID. */
+ private final ConcurrentHashMap<Long, PromiseAndTimestamp> pendingRequests = new ConcurrentHashMap<>();
+
+ /** Current request number used to assign unique request IDs. */
+ private final AtomicLong requestCount = new AtomicLong();
+
+ /** Reference to a failure that was reported by the channel. */
+ private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
+
+ /**
+ * Creates an established connection with the given channel.
+ *
+ * @param serverAddress Address of the server connected to
+ * @param channel The actual TCP channel
+ */
+ EstablishedConnection(KvStateServerAddress serverAddress, Channel channel) {
+ this.serverAddress = Preconditions.checkNotNull(serverAddress, "KvStateServerAddress");
+ this.channel = Preconditions.checkNotNull(channel, "Channel");
+
+ // Add the client handler with the callback
+ channel.pipeline().addLast("KvStateClientHandler", new KvStateClientHandler(this));
+
+ stats.reportActiveConnection();
+ }
+
+ /**
+ * Close the channel with a ClosedChannelException.
+ */
+ void close() {
+ close(new ClosedChannelException());
+ }
+
+ /**
+ * Close the channel with a cause.
+ *
+ * @param cause The cause to close the channel with.
+ * @return Channel close future
+ */
+ private boolean close(Throwable cause) {
+ if (failureCause.compareAndSet(null, cause)) {
+ channel.close();
+ stats.reportInactiveConnection();
+
+ for (long requestId : pendingRequests.keySet()) {
+ PromiseAndTimestamp pending = pendingRequests.remove(requestId);
+ if (pending != null && pending.promise.tryFailure(cause)) {
+ stats.reportFailedRequest();
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns a future holding the serialized request result.
+ *
+ * @param kvStateId ID of the KvState instance to query
+ * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance
+ * with
+ * @return Future holding the serialized result
+ */
+ Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
+ PromiseAndTimestamp requestPromiseTs = new PromiseAndTimestamp(
+ Futures.<byte[]>promise(),
+ System.nanoTime());
+
+ try {
+ final long requestId = requestCount.getAndIncrement();
+ pendingRequests.put(requestId, requestPromiseTs);
+
+ stats.reportRequest();
+
+ ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ requestId,
+ kvStateId,
+ serializedKeyAndNamespace);
+
+ channel.writeAndFlush(buf).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ // Fail promise if not failed to write
+ PromiseAndTimestamp pending = pendingRequests.remove(requestId);
+ if (pending != null && pending.promise.tryFailure(future.cause())) {
+ stats.reportFailedRequest();
+ }
+ }
+ }
+ });
+
+ // Check failure for possible race. We don't want any lingering
+ // promises after a failure, which can happen if we don't check
+ // this here. Note that close is treated as a failure as well.
+ Throwable failure = failureCause.get();
+ if (failure != null) {
+ // Remove from pending requests to guard against concurrent
+ // removal and to make sure that we only count it once as failed.
+ PromiseAndTimestamp p = pendingRequests.remove(requestId);
+ if (p != null && p.promise.tryFailure(failure)) {
+ stats.reportFailedRequest();
+ }
+ }
+ } catch (Throwable t) {
+ requestPromiseTs.promise.tryFailure(t);
+ }
+
+ return requestPromiseTs.promise.future();
+ }
+
+ @Override
+ public void onRequestResult(long requestId, byte[] serializedValue) {
+ PromiseAndTimestamp pending = pendingRequests.remove(requestId);
+ if (pending != null && pending.promise.trySuccess(serializedValue)) {
+ long durationMillis = (System.nanoTime() - pending.timestamp) / 1_000_000;
+ stats.reportSuccessfulRequest(durationMillis);
+ }
+ }
+
+ @Override
+ public void onRequestFailure(long requestId, Throwable cause) {
+ PromiseAndTimestamp pending = pendingRequests.remove(requestId);
+ if (pending != null && pending.promise.tryFailure(cause)) {
+ stats.reportFailedRequest();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ if (close(cause)) {
+ // Remove from established channels, otherwise future
+ // requests will be handled by this failed channel.
+ establishedConnections.remove(serverAddress, this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "EstablishedConnection{" +
+ "serverAddress=" + serverAddress +
+ ", channel=" + channel +
+ ", pendingRequests=" + pendingRequests.size() +
+ ", requestCount=" + requestCount +
+ ", failureCause=" + failureCause +
+ '}';
+ }
+
+ /**
+ * Pair of promise and a timestamp.
+ */
+ private class PromiseAndTimestamp {
+
+ private final Promise<byte[]> promise;
+ private final long timestamp;
+
+ public PromiseAndTimestamp(Promise<byte[]> promise, long timestamp) {
+ this.promise = promise;
+ this.timestamp = timestamp;
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
new file mode 100644
index 0000000..2166bf2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * This handler expects responses from {@link KvStateServerHandler}.
+ *
+ * <p>It deserializes the response and calls the registered callback, which is
+ * responsible for actually handling the result (see {@link KvStateClient.EstablishedConnection}).
+ */
+class KvStateClientHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KvStateClientHandler.class);
+
+ private final KvStateClientHandlerCallback callback;
+
+ /**
+ * Creates a {@link KvStateClientHandler} with the callback.
+ *
+ * @param callback Callback for responses.
+ */
+ KvStateClientHandler(KvStateClientHandlerCallback callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ try {
+ ByteBuf buf = (ByteBuf) msg;
+ KvStateRequestType msgType = KvStateRequestSerializer.deserializeHeader(buf);
+
+ if (msgType == KvStateRequestType.REQUEST_RESULT) {
+ KvStateRequestResult result = KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
+ callback.onRequestResult(result.getRequestId(), result.getSerializedResult());
+ } else if (msgType == KvStateRequestType.REQUEST_FAILURE) {
+ KvStateRequestFailure failure = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
+ callback.onRequestFailure(failure.getRequestId(), failure.getCause());
+ } else if (msgType == KvStateRequestType.SERVER_FAILURE) {
+ throw KvStateRequestSerializer.deserializeServerFailure(buf);
+ } else {
+ throw new IllegalStateException("Unexpected response type '" + msgType + "'");
+ }
+ } catch (Throwable t1) {
+ try {
+ callback.onFailure(t1);
+ } catch (Throwable t2) {
+ LOG.error("Failed to notify callback about failure", t2);
+ }
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ try {
+ callback.onFailure(cause);
+ } catch (Throwable t) {
+ LOG.error("Failed to notify callback about failure", t);
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // Only the client is expected to close the channel. Otherwise it
+ // indicates a failure. Note that this will be invoked in both cases
+ // though. If the callback closed the channel, the callback must be
+ // ignored.
+ try {
+ callback.onFailure(new ClosedChannelException());
+ } catch (Throwable t) {
+ LOG.error("Failed to notify callback about failure", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
new file mode 100644
index 0000000..65ff781
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import org.apache.flink.runtime.query.netty.message.KvStateRequest;
+
+/**
+ * Callback for {@link KvStateClientHandler}.
+ */
+interface KvStateClientHandlerCallback {
+
+ /**
+ * Called on a successful {@link KvStateRequest}.
+ *
+ * @param requestId ID of the request
+ * @param serializedValue Serialized value for the request
+ */
+ void onRequestResult(long requestId, byte[] serializedValue);
+
+ /**
+ * Called on a failed {@link KvStateRequest}.
+ *
+ * @param requestId ID of the request
+ * @param cause Cause of the request failure
+ */
+ void onRequestFailure(long requestId, Throwable cause);
+
+ /**
+ * Called on any failure, which is not related to a specific request.
+ *
+ * <p>This can be for example a caught Exception in the channel pipeline
+ * or an unexpected channel close.
+ *
+ * @param cause Cause of the failure
+ */
+ void onFailure(Throwable cause);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
new file mode 100644
index 0000000..1c0d8d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+/**
+ * Simple statistics for {@link KvStateServer} monitoring.
+ */
+public interface KvStateRequestStats {
+
+ /**
+ * Reports an active connection.
+ */
+ void reportActiveConnection();
+
+ /**
+ * Reports an inactive connection.
+ */
+ void reportInactiveConnection();
+
+ /**
+ * Reports an incoming request.
+ */
+ void reportRequest();
+
+ /**
+ * Reports a successfully handled request.
+ *
+ * @param durationTotalMillis Duration of the request (in milliseconds).
+ */
+ void reportSuccessfulRequest(long durationTotalMillis);
+
+ /**
+ * Reports a failure during a request.
+ */
+ void reportFailedRequest();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
new file mode 100644
index 0000000..0c0c5ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.message.KvStateRequest;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Netty-based server answering {@link KvStateRequest} messages.
+ *
+ * <p>Requests are handled by asynchronous query tasks (see {@link KvStateServerHandler.AsyncKvStateQueryTask})
+ * that are executed by a separate query Thread pool. This pool is shared among
+ * all TCP connections.
+ *
+ * <p>The incoming pipeline looks as follows:
+ * <pre>
+ * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
+ * </pre>
+ *
+ * <p>Received binary messages are expected to contain a frame length field. Netty's
+ * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before
+ * giving it to our {@link KvStateServerHandler}.
+ *
+ * <p>Connections are established and closed by the client. The server only
+ * closes the connection on a fatal failure that cannot be recovered. A
+ * server-side connection close is considered a failure by the client.
+ */
+public class KvStateServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KvStateServer.class);
+
+ /** Server config: low water mark */
+ private static final int LOW_WATER_MARK = 8 * 1024;
+
+ /** Server config: high water mark */
+ private static final int HIGH_WATER_MARK = 32 * 1024;
+
+ /** Netty's ServerBootstrap. */
+ private final ServerBootstrap bootstrap;
+
+ /** Query executor thread pool. */
+ private final ExecutorService queryExecutor;
+
+ /** Address of this server. */
+ private KvStateServerAddress serverAddress;
+
+ /**
+ * Creates the {@link KvStateServer}.
+ *
+ * <p>The server needs to be started via {@link #start()} in order to bind
+ * to the configured bind address.
+ *
+ * @param bindAddress Address to bind to
+ * @param bindPort Port to bind to. Pick random port if 0.
+ * @param numEventLoopThreads Number of event loop threads
+ * @param numQueryThreads Number of query threads
+ * @param kvStateRegistry KvStateRegistry to query for KvState instances
+ * @param stats Statistics tracker
+ */
+ public KvStateServer(
+ InetAddress bindAddress,
+ int bindPort,
+ int numEventLoopThreads,
+ int numQueryThreads,
+ KvStateRegistry kvStateRegistry,
+ KvStateRequestStats stats) {
+
+ Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort +
+ " is out of valid port range (0-65536).");
+
+ Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
+ Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
+
+ Preconditions.checkNotNull(kvStateRegistry, "KvStateRegistry");
+ Preconditions.checkNotNull(stats, "KvStateRequestStats");
+
+ NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("Flink KvStateServer EventLoop Thread %d")
+ .build();
+
+ NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
+
+ queryExecutor = createQueryExecutor(numQueryThreads);
+
+ // Shared between all channels
+ KvStateServerHandler serverHandler = new KvStateServerHandler(
+ kvStateRegistry,
+ queryExecutor,
+ stats);
+
+ bootstrap = new ServerBootstrap()
+ // Bind address and port
+ .localAddress(bindAddress, bindPort)
+ // NIO server channels
+ .group(nioGroup)
+ .channel(NioServerSocketChannel.class)
+ // Server channel Options
+ .option(ChannelOption.ALLOCATOR, bufferPool)
+ // Child channel options
+ .childOption(ChannelOption.ALLOCATOR, bufferPool)
+ .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
+ .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
+ // See initializer for pipeline details
+ .childHandler(new KvStateServerChannelInitializer(serverHandler));
+ }
+
+ /**
+ * Starts the server by binding to the configured bind address (blocking).
+ *
+ * @throws InterruptedException If interrupted during the bind operation
+ */
+ public void start() throws InterruptedException {
+ Channel channel = bootstrap.bind().sync().channel();
+
+ InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
+ serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+ }
+
+ /**
+ * Returns the address of this server.
+ *
+ * @return Server address
+ * @throws IllegalStateException If server has not been started yet
+ */
+ public KvStateServerAddress getAddress() {
+ if (serverAddress == null) {
+ throw new IllegalStateException("KvStateServer not started yet.");
+ }
+
+ return serverAddress;
+ }
+
+ /**
+ * Shuts down the server and all related thread pools.
+ */
+ public void shutDown() {
+ if (bootstrap != null) {
+ EventLoopGroup group = bootstrap.group();
+ if (group != null) {
+ Future<?> shutDownFuture = group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
+ try {
+ shutDownFuture.await();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted during shut down", e);
+ }
+ }
+ }
+
+ if (queryExecutor != null) {
+ queryExecutor.shutdown();
+ }
+
+ serverAddress = null;
+ }
+
+ /**
+ * Creates a thread pool for the query execution.
+ *
+ * @param numQueryThreads Number of query threads.
+ * @return Thread pool for query execution
+ */
+ private static ExecutorService createQueryExecutor(int numQueryThreads) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("Flink KvStateServer Query Thread %d")
+ .build();
+
+ return Executors.newFixedThreadPool(numQueryThreads, threadFactory);
+ }
+
+ /**
+ * Channel pipeline initializer.
+ *
+ * <p>The request handler is shared, whereas the other handlers are created
+ * per channel.
+ */
+ private static final class KvStateServerChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ /** The shared request handler. */
+ private final KvStateServerHandler sharedRequestHandler;
+
+ /**
+ * Creates the channel pipeline initializer with the shared request handler.
+ *
+ * @param sharedRequestHandler Shared request handler.
+ */
+ public KvStateServerChannelInitializer(KvStateServerHandler sharedRequestHandler) {
+ this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "Request handler");
+ }
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline()
+ .addLast(new ChunkedWriteHandler())
+ .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+ .addLast(sharedRequestHandler);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
new file mode 100644
index 0000000..47f2ad6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.message.KvStateRequest;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * This handler dispatches asynchronous tasks, which query {@link KvState}
+ * instances and write the result to the channel.
+ *
+ * <p>The network threads receive the message, deserialize it and dispatch the
+ * query task. The actual query is handled in a separate thread as it might
+ * otherwise block the network threads (file I/O etc.).
+ */
+@ChannelHandler.Sharable
+class KvStateServerHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
+
+ /** KvState registry holding references to the KvState instances. */
+ private final KvStateRegistry registry;
+
+ /** Thread pool for query execution. */
+ private final ExecutorService queryExecutor;
+
+ /** Exposed server statistics. */
+ private final KvStateRequestStats stats;
+
+ /**
+ * Create the handler.
+ *
+ * @param kvStateRegistry Registry to query.
+ * @param queryExecutor Thread pool for query execution.
+ * @param stats Exposed server statistics.
+ */
+ KvStateServerHandler(
+ KvStateRegistry kvStateRegistry,
+ ExecutorService queryExecutor,
+ KvStateRequestStats stats) {
+
+ this.registry = Objects.requireNonNull(kvStateRegistry, "KvStateRegistry");
+ this.queryExecutor = Objects.requireNonNull(queryExecutor, "Query thread pool");
+ this.stats = Objects.requireNonNull(stats, "KvStateRequestStats");
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ stats.reportActiveConnection();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ stats.reportInactiveConnection();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ KvStateRequest request = null;
+
+ try {
+ ByteBuf buf = (ByteBuf) msg;
+ KvStateRequestType msgType = KvStateRequestSerializer.deserializeHeader(buf);
+
+ if (msgType == KvStateRequestType.REQUEST) {
+ // ------------------------------------------------------------
+ // Request
+ // ------------------------------------------------------------
+ request = KvStateRequestSerializer.deserializeKvStateRequest(buf);
+
+ stats.reportRequest();
+
+ KvState<?, ?, ?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
+
+ if (kvState != null) {
+ // Execute actual query async, because it is possibly
+ // blocking (e.g. file I/O).
+ //
+ // A submission failure is not treated as fatal.
+ queryExecutor.submit(new AsyncKvStateQueryTask(ctx, request, kvState, stats));
+ } else {
+ ByteBuf unknown = KvStateRequestSerializer.serializeKvStateRequestFailure(
+ ctx.alloc(),
+ request.getRequestId(),
+ new UnknownKvStateID(request.getKvStateId()));
+
+ ctx.writeAndFlush(unknown);
+
+ stats.reportFailedRequest();
+ }
+ } else {
+ // ------------------------------------------------------------
+ // Unexpected
+ // ------------------------------------------------------------
+ ByteBuf failure = KvStateRequestSerializer.serializeServerFailure(
+ ctx.alloc(),
+ new IllegalArgumentException("Unexpected message type " + msgType
+ + ". KvStateServerHandler expects "
+ + KvStateRequestType.REQUEST + " messages."));
+
+ ctx.writeAndFlush(failure);
+ }
+ } catch (Throwable t) {
+ String stringifiedCause = ExceptionUtils.stringifyException(t);
+
+ ByteBuf err;
+ if (request != null) {
+ String errMsg = "Failed to handle incoming request with ID " +
+ request.getRequestId() + ". Caused by: " + stringifiedCause;
+ err = KvStateRequestSerializer.serializeKvStateRequestFailure(
+ ctx.alloc(),
+ request.getRequestId(),
+ new RuntimeException(errMsg));
+
+ stats.reportFailedRequest();
+ } else {
+ String errMsg = "Failed to handle incoming message. Caused by: " + stringifiedCause;
+ err = KvStateRequestSerializer.serializeServerFailure(
+ ctx.alloc(),
+ new RuntimeException(errMsg));
+ }
+
+ ctx.writeAndFlush(err);
+ } finally {
+ // IMPORTANT: We have to always recycle the incoming buffer.
+ // Otherwise we will leak memory out of Netty's buffer pool.
+ //
+ // If any operation ever holds on to the buffer, it is the
+ // responsibility of that operation to retain the buffer and
+ // release it later.
+ ReferenceCountUtil.release(msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ String stringifiedCause = ExceptionUtils.stringifyException(cause);
+ String msg = "Exception in server pipeline. Caused by: " + stringifiedCause;
+
+ ByteBuf err = KvStateRequestSerializer.serializeServerFailure(
+ ctx.alloc(),
+ new RuntimeException(msg));
+
+ ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ /**
+ * Task to execute the actual query against the {@link KvState} instance.
+ */
+ private static class AsyncKvStateQueryTask implements Runnable {
+
+ private final ChannelHandlerContext ctx;
+
+ private final KvStateRequest request;
+
+ private final KvState<?, ?, ?, ?, ?> kvState;
+
+ private final KvStateRequestStats stats;
+
+ private final long creationNanos;
+
+ public AsyncKvStateQueryTask(
+ ChannelHandlerContext ctx,
+ KvStateRequest request,
+ KvState<?, ?, ?, ?, ?> kvState,
+ KvStateRequestStats stats) {
+
+ this.ctx = Objects.requireNonNull(ctx, "Channel handler context");
+ this.request = Objects.requireNonNull(request, "State query");
+ this.kvState = Objects.requireNonNull(kvState, "KvState");
+ this.stats = Objects.requireNonNull(stats, "State query stats");
+ this.creationNanos = System.nanoTime();
+ }
+
+ @Override
+ public void run() {
+ boolean success = false;
+
+ try {
+ if (!ctx.channel().isActive()) {
+ return;
+ }
+
+ // Query the KvState instance
+ byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
+ byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
+
+ if (serializedResult != null) {
+ // We found some data, success!
+ ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult(
+ ctx.alloc(),
+ request.getRequestId(),
+ serializedResult);
+
+ int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark();
+
+ ChannelFuture write;
+ if (buf.readableBytes() <= highWatermark) {
+ write = ctx.writeAndFlush(buf);
+ } else {
+ write = ctx.writeAndFlush(new ChunkedByteBuf(buf, highWatermark));
+ }
+
+ write.addListener(new QueryResultWriteListener());
+
+ success = true;
+ } else {
+ // No data for the key/namespace. This is considered to be
+ // a failure.
+ ByteBuf unknownKey = KvStateRequestSerializer.serializeKvStateRequestFailure(
+ ctx.alloc(),
+ request.getRequestId(),
+ new UnknownKeyOrNamespace());
+
+ ctx.writeAndFlush(unknownKey);
+ }
+ } catch (Throwable t) {
+ try {
+ String stringifiedCause = ExceptionUtils.stringifyException(t);
+ String errMsg = "Failed to query state backend for query " +
+ request.getRequestId() + ". Caused by: " + stringifiedCause;
+
+ ByteBuf err = KvStateRequestSerializer.serializeKvStateRequestFailure(
+ ctx.alloc(), request.getRequestId(), new RuntimeException(errMsg));
+
+ ctx.writeAndFlush(err);
+ } catch (IOException e) {
+ LOG.error("Failed to respond with the error after failed to query state backend", e);
+ }
+ } finally {
+ if (!success) {
+ stats.reportFailedRequest();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncKvStateQueryTask{" +
+ ", request=" + request +
+ ", creationNanos=" + creationNanos +
+ '}';
+ }
+
+ /**
+ * Callback after query result has been written.
+ *
+ * <p>Gathers stats and logs errors.
+ */
+ private class QueryResultWriteListener implements ChannelFutureListener {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ long durationMillis = (System.nanoTime() - creationNanos) / 1_000_000;
+
+ if (future.isSuccess()) {
+ stats.reportSuccessfulRequest(durationMillis);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Query " + request + " failed after " + durationMillis + " ms", future.cause());
+ }
+
+ stats.reportFailedRequest();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
new file mode 100644
index 0000000..4e5a1de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+/**
+ * Thrown if the KvState does not hold any state for the given key or namespace.
+ */
+public class UnknownKeyOrNamespace extends IllegalStateException {
+
+ private static final long serialVersionUID = 1L;
+
+ UnknownKeyOrNamespace() {
+ super("KvState does not hold any state for key/namespace.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
new file mode 100644
index 0000000..cc60035
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty;
+
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Thrown if no KvState with the given ID cannot found by the server handler.
+ */
+public class UnknownKvStateID extends IllegalStateException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnknownKvStateID(KvStateID kvStateId) {
+ super("No KvState registered with ID " + Preconditions.checkNotNull(kvStateId, "KvStateID") +
+ " at TaskManager.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
new file mode 100644
index 0000000..0abb653
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link KvState} instance request for a specific key and namespace.
+ */
+public final class KvStateRequest {
+
+ /** ID for this request. */
+ private final long requestId;
+
+ /** ID of the requested KvState instance. */
+ private final KvStateID kvStateId;
+
+ /** Serialized key and namespace to request from the KvState instance. */
+ private final byte[] serializedKeyAndNamespace;
+
+ /**
+ * Creates a KvState instance request.
+ *
+ * @param requestId ID for this request
+ * @param kvStateId ID of the requested KvState instance
+ * @param serializedKeyAndNamespace Serialized key and namespace to request from the KvState
+ * instance
+ */
+ KvStateRequest(long requestId, KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
+ this.requestId = requestId;
+ this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
+ this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+ }
+
+ /**
+ * Returns the request ID.
+ *
+ * @return Request ID
+ */
+ public long getRequestId() {
+ return requestId;
+ }
+
+ /**
+ * Returns the ID of the requested KvState instance.
+ *
+ * @return ID of the requested KvState instance
+ */
+ public KvStateID getKvStateId() {
+ return kvStateId;
+ }
+
+ /**
+ * Returns the serialized key and namespace to request from the KvState
+ * instance.
+ *
+ * @return Serialized key and namespace to request from the KvState instance
+ */
+ public byte[] getSerializedKeyAndNamespace() {
+ return serializedKeyAndNamespace;
+ }
+
+ @Override
+ public String toString() {
+ return "KvStateRequest{" +
+ "requestId=" + requestId +
+ ", kvStateId=" + kvStateId +
+ ", serializedKeyAndNamespace.length=" + serializedKeyAndNamespace.length +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
new file mode 100644
index 0000000..06a3ce8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+/**
+ * A failure response to a {@link KvStateRequest}.
+ */
+public final class KvStateRequestFailure {
+
+ /** ID of the request responding to. */
+ private final long requestId;
+
+ /** Failure cause. Not allowed to be a user type. */
+ private final Throwable cause;
+
+ /**
+ * Creates a failure response to a {@link KvStateRequest}.
+ *
+ * @param requestId ID for the request responding to
+ * @param cause Failure cause (not allowed to be a user type)
+ */
+ KvStateRequestFailure(long requestId, Throwable cause) {
+ this.requestId = requestId;
+ this.cause = cause;
+ }
+
+ /**
+ * Returns the request ID responding to.
+ *
+ * @return Request ID responding to
+ */
+ public long getRequestId() {
+ return requestId;
+ }
+
+ /**
+ * Returns the failure cause.
+ *
+ * @return Failure cause
+ */
+ public Throwable getCause() {
+ return cause;
+ }
+
+ @Override
+ public String toString() {
+ return "KvStateRequestFailure{" +
+ "requestId=" + requestId +
+ ", cause=" + cause +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
new file mode 100644
index 0000000..2bd8a36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A successful response to a {@link KvStateRequest} containing the serialized
+ * result for the requested key and namespace.
+ */
+public final class KvStateRequestResult {
+
+ /** ID of the request responding to. */
+ private final long requestId;
+
+ /**
+ * Serialized result for the requested key and namespace. If no result was
+ * available for the specified key and namespace, this is <code>null</code>.
+ */
+ private final byte[] serializedResult;
+
+ /**
+ * Creates a successful {@link KvStateRequestResult} response.
+ *
+ * @param requestId ID of the request responding to
+ * @param serializedResult Serialized result or <code>null</code> if none
+ */
+ KvStateRequestResult(long requestId, byte[] serializedResult) {
+ this.requestId = requestId;
+ this.serializedResult = Preconditions.checkNotNull(serializedResult, "Serialization result");
+ }
+
+ /**
+ * Returns the request ID responding to.
+ *
+ * @return Request ID responding to
+ */
+ public long getRequestId() {
+ return requestId;
+ }
+
+ /**
+ * Returns the serialized result or <code>null</code> if none available.
+ *
+ * @return Serialized result or <code>null</code> if none available.
+ */
+ public byte[] getSerializedResult() {
+ return serializedResult;
+ }
+
+ @Override
+ public String toString() {
+ return "KvStateRequestResult{" +
+ "requestId=" + requestId +
+ ", serializedResult.length=" + serializedResult.length +
+ '}';
+ }
+}