You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/26 17:01:57 UTC
[08/13] flink git commit: [FLINK-7908][QS] Restructure the queryable
state module.
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
deleted file mode 100644
index 18a88da..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The base class of every handler used by an {@link AbstractServerBase}.
- *
- * @param <REQ> the type of request the server expects to receive.
- * @param <RESP> the type of response the server will send.
- */
-@Internal
-@ChannelHandler.Sharable
-public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractServerHandler.class);
-
- /** The owning server of this handler. */
- private final AbstractServerBase<REQ, RESP> server;
-
- /** The serializer used to (de-)serialize messages. */
- private final MessageSerializer<REQ, RESP> serializer;
-
- /** Thread pool for query execution. */
- protected final ExecutorService queryExecutor;
-
- /** Exposed server statistics. */
- private final KvStateRequestStats stats;
-
- /**
- * Create the handler.
- *
- * @param serializer the serializer used to (de-)serialize messages
- * @param stats statistics collector
- */
- public AbstractServerHandler(
- final AbstractServerBase<REQ, RESP> server,
- final MessageSerializer<REQ, RESP> serializer,
- final KvStateRequestStats stats) {
-
- this.server = Preconditions.checkNotNull(server);
- this.serializer = Preconditions.checkNotNull(serializer);
- this.queryExecutor = server.getQueryExecutor();
- this.stats = Preconditions.checkNotNull(stats);
- }
-
- protected String getServerName() {
- return server.getServerName();
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- stats.reportActiveConnection();
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- stats.reportInactiveConnection();
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- REQ request = null;
- long requestId = -1L;
-
- try {
- final ByteBuf buf = (ByteBuf) msg;
- final MessageType msgType = MessageSerializer.deserializeHeader(buf);
-
- requestId = MessageSerializer.getRequestId(buf);
-
- if (msgType == MessageType.REQUEST) {
-
- // ------------------------------------------------------------
- // MessageBody
- // ------------------------------------------------------------
- request = serializer.deserializeRequest(buf);
- stats.reportRequest();
-
- // Execute actual query async, because it is possibly
- // blocking (e.g. file I/O).
- //
- // A submission failure is not treated as fatal.
- queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats));
-
- } else {
- // ------------------------------------------------------------
- // Unexpected
- // ------------------------------------------------------------
-
- final String errMsg = "Unexpected message type " + msgType + ". Expected " + MessageType.REQUEST + ".";
- final ByteBuf failure = MessageSerializer.serializeServerFailure(ctx.alloc(), new IllegalArgumentException(errMsg));
-
- LOG.debug(errMsg);
- ctx.writeAndFlush(failure);
- }
- } catch (Throwable t) {
- final String stringifiedCause = ExceptionUtils.stringifyException(t);
-
- String errMsg;
- ByteBuf err;
- if (request != null) {
- errMsg = "Failed request with ID " + requestId + ". Caused by: " + stringifiedCause;
- err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
- stats.reportFailedRequest();
- } else {
- errMsg = "Failed incoming message. Caused by: " + stringifiedCause;
- err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(errMsg));
- }
-
- LOG.debug(errMsg);
- ctx.writeAndFlush(err);
-
- } finally {
- // IMPORTANT: We have to always recycle the incoming buffer.
- // Otherwise we will leak memory out of Netty's buffer pool.
- //
- // If any operation ever holds on to the buffer, it is the
- // responsibility of that operation to retain the buffer and
- // release it later.
- ReferenceCountUtil.release(msg);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- final String msg = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(cause);
- final ByteBuf err = serializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
-
- LOG.debug(msg);
- ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
- }
-
- /**
- * Handles an incoming request and returns a {@link CompletableFuture} containing the corresponding response.
- *
- * <p><b>NOTE:</b> This method is called by multiple threads.
- *
- * @param requestId the id of the received request to be handled.
- * @param request the request to be handled.
- * @return A future with the response to be forwarded to the client.
- */
- public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request);
-
- /**
- * Shuts down any handler specific resources, e.g. thread pools etc.
- */
- public abstract void shutdown();
-
- /**
- * Task to execute the actual query against the {@link InternalKvState} instance.
- */
- private static class AsyncRequestTask<REQ extends MessageBody, RESP extends MessageBody> implements Runnable {
-
- private final AbstractServerHandler<REQ, RESP> handler;
-
- private final ChannelHandlerContext ctx;
-
- private final long requestId;
-
- private final REQ request;
-
- private final KvStateRequestStats stats;
-
- private final long creationNanos;
-
- AsyncRequestTask(
- final AbstractServerHandler<REQ, RESP> handler,
- final ChannelHandlerContext ctx,
- final long requestId,
- final REQ request,
- final KvStateRequestStats stats) {
-
- this.handler = Preconditions.checkNotNull(handler);
- this.ctx = Preconditions.checkNotNull(ctx);
- this.requestId = requestId;
- this.request = Preconditions.checkNotNull(request);
- this.stats = Preconditions.checkNotNull(stats);
- this.creationNanos = System.nanoTime();
- }
-
- @Override
- public void run() {
-
- if (!ctx.channel().isActive()) {
- return;
- }
-
- handler.handleRequest(requestId, request).whenComplete((resp, throwable) -> {
- try {
- if (throwable != null) {
- throw throwable instanceof CompletionException
- ? throwable.getCause()
- : throwable;
- }
-
- if (resp == null) {
- throw new BadRequestException(handler.getServerName(), "NULL returned for request with ID " + requestId + ".");
- }
-
- final ByteBuf serialResp = MessageSerializer.serializeResponse(ctx.alloc(), requestId, resp);
-
- int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark();
-
- ChannelFuture write;
- if (serialResp.readableBytes() <= highWatermark) {
- write = ctx.writeAndFlush(serialResp);
- } else {
- write = ctx.writeAndFlush(new ChunkedByteBuf(serialResp, highWatermark));
- }
- write.addListener(new RequestWriteListener());
-
- } catch (BadRequestException e) {
- try {
- stats.reportFailedRequest();
- final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, e);
- ctx.writeAndFlush(err);
- } catch (IOException io) {
- LOG.error("Failed to respond with the error after failed request", io);
- }
- } catch (Throwable t) {
- try {
- stats.reportFailedRequest();
-
- final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
- final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
- ctx.writeAndFlush(err);
- } catch (IOException io) {
- LOG.error("Failed to respond with the error after failed request", io);
- }
- }
- });
- }
-
- @Override
- public String toString() {
- return "AsyncRequestTask{" +
- "requestId=" + requestId +
- ", request=" + request +
- '}';
- }
-
- /**
- * Callback after query result has been written.
- *
- * <p>Gathers stats and logs errors.
- */
- private class RequestWriteListener implements ChannelFutureListener {
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- long durationNanos = System.nanoTime() - creationNanos;
- long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
-
- if (future.isSuccess()) {
- LOG.debug("Request {} was successfully answered after {} ms.", request, durationMillis);
- stats.reportSuccessfulRequest(durationMillis);
- } else {
- LOG.debug("Request {} failed after {} ms : ", request, durationMillis, future.cause());
- stats.reportFailedRequest();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
deleted file mode 100644
index 3c0c484..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Base class for exceptions thrown during querying Flink's managed state.
- */
-@Internal
-public class BadRequestException extends Exception {
-
- private static final long serialVersionUID = 3458743952407632903L;
-
- public BadRequestException(String serverName, String message) {
- super(Preconditions.checkNotNull(serverName) + " : " + message);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
deleted file mode 100644
index 9c56025..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-/**
- * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler},
- * respecting the high and low watermarks.
- *
- * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a>
- */
-@Internal
-public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
-
- /** The buffer to chunk. */
- private final ByteBuf buf;
-
- /** Size of chunks. */
- private final int chunkSize;
-
- /** Closed flag. */
- private boolean isClosed;
-
- /** End of input flag. */
- private boolean isEndOfInput;
-
- public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
- this.buf = Preconditions.checkNotNull(buf, "Buffer");
- Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size");
- this.chunkSize = chunkSize;
- }
-
- @Override
- public boolean isEndOfInput() throws Exception {
- return isClosed || isEndOfInput;
- }
-
- @Override
- public void close() throws Exception {
- if (!isClosed) {
- // If we did not consume the whole buffer yet, we have to release
- // it here. Otherwise, it's the responsibility of the consumer.
- if (!isEndOfInput) {
- buf.release();
- }
-
- isClosed = true;
- }
- }
-
- @Override
- public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
- if (isClosed) {
- return null;
- } else if (buf.readableBytes() <= chunkSize) {
- isEndOfInput = true;
-
- // Don't retain as the consumer is responsible to release it
- return buf.slice();
- } else {
- // Return a chunk sized slice of the buffer. The ref count is
- // shared with the original buffer. That's why we need to retain
- // a reference here.
- return buf.readSlice(chunkSize).retain();
- }
- }
-
- @Override
- public String toString() {
- return "ChunkedByteBuf{" +
- "buf=" + buf +
- ", chunkSize=" + chunkSize +
- ", isClosed=" + isClosed +
- ", isEndOfInput=" + isEndOfInput +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
deleted file mode 100644
index e6d59de..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayDeque;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The base class for every client in the queryable state module.
- * It is using pure netty to send and receive messages of type {@link MessageBody}.
- *
- * @param <REQ> the type of request the client will send.
- * @param <RESP> the type of response the client expects to receive.
- */
-@Internal
-public class Client<REQ extends MessageBody, RESP extends MessageBody> {
-
- /** The name of the client. Used for logging and stack traces.*/
- private final String clientName;
-
- /** Netty's Bootstrap. */
- private final Bootstrap bootstrap;
-
- /** The serializer to be used for (de-)serializing messages. */
- private final MessageSerializer<REQ, RESP> messageSerializer;
-
- /** Statistics tracker. */
- private final KvStateRequestStats stats;
-
- /** Established connections. */
- private final Map<KvStateServerAddress, EstablishedConnection> establishedConnections = new ConcurrentHashMap<>();
-
- /** Pending connections. */
- private final Map<KvStateServerAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>();
-
- /** Atomic shut down flag. */
- private final AtomicBoolean shutDown = new AtomicBoolean();
-
- /**
- * Creates a client with the specified number of event loop threads.
- *
- * @param clientName the name of the client.
- * @param numEventLoopThreads number of event loop threads (minimum 1).
- * @param serializer the serializer used to (de-)serialize messages.
- * @param stats the statistics collector.
- */
- public Client(
- final String clientName,
- final int numEventLoopThreads,
- final MessageSerializer<REQ, RESP> serializer,
- final KvStateRequestStats stats) {
-
- Preconditions.checkArgument(numEventLoopThreads >= 1,
- "Non-positive number of event loop threads.");
-
- this.clientName = Preconditions.checkNotNull(clientName);
- this.messageSerializer = Preconditions.checkNotNull(serializer);
- this.stats = Preconditions.checkNotNull(stats);
-
- final ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("Flink " + clientName + " Event Loop Thread %d")
- .build();
-
- final EventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
- final ByteBufAllocator bufferPool = new NettyBufferPool(numEventLoopThreads);
-
- this.bootstrap = new Bootstrap()
- .group(nioGroup)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.ALLOCATOR, bufferPool)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline()
- .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
- .addLast(new ChunkedWriteHandler());
- }
- });
- }
-
- public String getClientName() {
- return clientName;
- }
-
- public CompletableFuture<RESP> sendRequest(final KvStateServerAddress serverAddress, final REQ request) {
- if (shutDown.get()) {
- return FutureUtils.getFailedFuture(new IllegalStateException("Shut down"));
- }
-
- EstablishedConnection connection = establishedConnections.get(serverAddress);
- if (connection != null) {
- return connection.sendRequest(request);
- } else {
- PendingConnection pendingConnection = pendingConnections.get(serverAddress);
- if (pendingConnection != null) {
- // There was a race, use the existing pending connection.
- return pendingConnection.sendRequest(request);
- } else {
- // We try to connect to the server.
- PendingConnection pending = new PendingConnection(serverAddress, messageSerializer);
- PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending);
-
- if (previous == null) {
- // OK, we are responsible to connect.
- bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()).addListener(pending);
- return pending.sendRequest(request);
- } else {
- // There was a race, use the existing pending connection.
- return previous.sendRequest(request);
- }
- }
- }
- }
-
- /**
- * Shuts down the client and closes all connections.
- *
- * <p>After a call to this method, all returned futures will be failed.
- */
- public void shutdown() {
- if (shutDown.compareAndSet(false, true)) {
- for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
- if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
- conn.getValue().close();
- }
- }
-
- for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) {
- if (pendingConnections.remove(conn.getKey()) != null) {
- conn.getValue().close();
- }
- }
-
- if (bootstrap != null) {
- EventLoopGroup group = bootstrap.group();
- if (group != null) {
- group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
- }
- }
- }
- }
-
- /**
- * A pending connection that is in the process of connecting.
- */
- private class PendingConnection implements ChannelFutureListener {
-
- /** Lock to guard the connect call, channel hand in, etc. */
- private final Object connectLock = new Object();
-
- /** Address of the server we are connecting to. */
- private final KvStateServerAddress serverAddress;
-
- private final MessageSerializer<REQ, RESP> serializer;
-
- /** Queue of requests while connecting. */
- private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>();
-
- /** The established connection after the connect succeeds. */
- private EstablishedConnection established;
-
- /** Closed flag. */
- private boolean closed;
-
- /** Failure cause if something goes wrong. */
- private Throwable failureCause;
-
- /**
- * Creates a pending connection to the given server.
- *
- * @param serverAddress Address of the server to connect to.
- */
- private PendingConnection(
- final KvStateServerAddress serverAddress,
- final MessageSerializer<REQ, RESP> serializer) {
- this.serverAddress = serverAddress;
- this.serializer = serializer;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- handInChannel(future.channel());
- } else {
- close(future.cause());
- }
- }
-
- /**
- * Returns a future holding the serialized request result.
- *
- * <p>If the channel has been established, forward the call to the
- * established channel, otherwise queue it for when the channel is
- * handed in.
- *
- * @param request the request to be sent.
- * @return Future holding the serialized result
- */
- public CompletableFuture<RESP> sendRequest(REQ request) {
- synchronized (connectLock) {
- if (failureCause != null) {
- return FutureUtils.getFailedFuture(failureCause);
- } else if (closed) {
- return FutureUtils.getFailedFuture(new ClosedChannelException());
- } else {
- if (established != null) {
- return established.sendRequest(request);
- } else {
- // Queue this and handle when connected
- final PendingRequest pending = new PendingRequest(request);
- queuedRequests.add(pending);
- return pending;
- }
- }
- }
- }
-
- /**
- * Hands in a channel after a successful connection.
- *
- * @param channel Channel to hand in
- */
- private void handInChannel(Channel channel) {
- synchronized (connectLock) {
- if (closed || failureCause != null) {
- // Close the channel and we are done. Any queued requests
- // are removed on the close/failure call and after that no
- // new ones can be enqueued.
- channel.close();
- } else {
- established = new EstablishedConnection(serverAddress, serializer, channel);
-
- while (!queuedRequests.isEmpty()) {
- final PendingRequest pending = queuedRequests.poll();
-
- established.sendRequest(pending.request)
- .thenAccept(resp -> pending.complete(resp))
- .exceptionally(throwable -> {
- pending.completeExceptionally(throwable);
- return null;
- });
- }
-
- // Publish the channel for the general public
- establishedConnections.put(serverAddress, established);
- pendingConnections.remove(serverAddress);
-
- // Check shut down for possible race with shut down. We
- // don't want any lingering connections after shut down,
- // which can happen if we don't check this here.
- if (shutDown.get()) {
- if (establishedConnections.remove(serverAddress, established)) {
- established.close();
- }
- }
- }
- }
- }
-
- /**
- * Close the connecting channel with a ClosedChannelException.
- */
- private void close() {
- close(new ClosedChannelException());
- }
-
- /**
- * Close the connecting channel with an Exception (can be {@code null})
- * or forward to the established channel.
- */
- private void close(Throwable cause) {
- synchronized (connectLock) {
- if (!closed) {
- if (failureCause == null) {
- failureCause = cause;
- }
-
- if (established != null) {
- established.close();
- } else {
- PendingRequest pending;
- while ((pending = queuedRequests.poll()) != null) {
- pending.completeExceptionally(cause);
- }
- }
- closed = true;
- }
- }
- }
-
- @Override
- public String toString() {
- synchronized (connectLock) {
- return "PendingConnection{" +
- "serverAddress=" + serverAddress +
- ", queuedRequests=" + queuedRequests.size() +
- ", established=" + (established != null) +
- ", closed=" + closed +
- '}';
- }
- }
-
- /**
- * A pending request queued while the channel is connecting.
- */
- private final class PendingRequest extends CompletableFuture<RESP> {
-
- private final REQ request;
-
- private PendingRequest(REQ request) {
- this.request = request;
- }
- }
- }
-
- /**
- * An established connection that wraps the actual channel instance and is
- * registered at the {@link ClientHandler} for callbacks.
- */
- private class EstablishedConnection implements ClientHandlerCallback<RESP> {
-
- /** Address of the server we are connected to. */
- private final KvStateServerAddress serverAddress;
-
- /** The actual TCP channel. */
- private final Channel channel;
-
- /** Pending requests keyed by request ID. */
- private final ConcurrentHashMap<Long, TimestampedCompletableFuture> pendingRequests = new ConcurrentHashMap<>();
-
- /** Current request number used to assign unique request IDs. */
- private final AtomicLong requestCount = new AtomicLong();
-
- /** Reference to a failure that was reported by the channel. */
- private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
-
- /**
- * Creates an established connection with the given channel.
- *
- * @param serverAddress Address of the server connected to
- * @param channel The actual TCP channel
- */
- EstablishedConnection(
- final KvStateServerAddress serverAddress,
- final MessageSerializer<REQ, RESP> serializer,
- final Channel channel) {
-
- this.serverAddress = Preconditions.checkNotNull(serverAddress);
- this.channel = Preconditions.checkNotNull(channel);
-
- // Add the client handler with the callback
- channel.pipeline().addLast(
- getClientName() + " Handler",
- new ClientHandler<>(clientName, serializer, this)
- );
-
- stats.reportActiveConnection();
- }
-
- /**
- * Close the channel with a ClosedChannelException.
- */
- void close() {
- close(new ClosedChannelException());
- }
-
- /**
- * Close the channel with a cause.
- *
- * @param cause The cause to close the channel with.
- * @return Channel close future
- */
- private boolean close(Throwable cause) {
- if (failureCause.compareAndSet(null, cause)) {
- channel.close();
- stats.reportInactiveConnection();
-
- for (long requestId : pendingRequests.keySet()) {
- TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.completeExceptionally(cause)) {
- stats.reportFailedRequest();
- }
- }
- return true;
- }
- return false;
- }
-
- /**
- * Returns a future holding the serialized request result.
- * @param request the request to be sent.
- * @return Future holding the serialized result
- */
- CompletableFuture<RESP> sendRequest(REQ request) {
- TimestampedCompletableFuture requestPromiseTs =
- new TimestampedCompletableFuture(System.nanoTime());
- try {
- final long requestId = requestCount.getAndIncrement();
- pendingRequests.put(requestId, requestPromiseTs);
-
- stats.reportRequest();
-
- ByteBuf buf = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
-
- channel.writeAndFlush(buf).addListener((ChannelFutureListener) future -> {
- if (!future.isSuccess()) {
- // Fail promise if not failed to write
- TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.completeExceptionally(future.cause())) {
- stats.reportFailedRequest();
- }
- }
- });
-
- // Check failure for possible race. We don't want any lingering
- // promises after a failure, which can happen if we don't check
- // this here. Note that close is treated as a failure as well.
- Throwable failure = failureCause.get();
- if (failure != null) {
- // Remove from pending requests to guard against concurrent
- // removal and to make sure that we only count it once as failed.
- TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.completeExceptionally(failure)) {
- stats.reportFailedRequest();
- }
- }
- } catch (Throwable t) {
- requestPromiseTs.completeExceptionally(t);
- }
-
- return requestPromiseTs;
- }
-
- @Override
- public void onRequestResult(long requestId, RESP response) {
- TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.complete(response)) {
- long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L;
- stats.reportSuccessfulRequest(durationMillis);
- }
- }
-
- @Override
- public void onRequestFailure(long requestId, Throwable cause) {
- TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
- if (pending != null && pending.completeExceptionally(cause)) {
- stats.reportFailedRequest();
- }
- }
-
- @Override
- public void onFailure(Throwable cause) {
- if (close(cause)) {
- // Remove from established channels, otherwise future
- // requests will be handled by this failed channel.
- establishedConnections.remove(serverAddress, this);
- }
- }
-
- @Override
- public String toString() {
- return "EstablishedConnection{" +
- "serverAddress=" + serverAddress +
- ", channel=" + channel +
- ", pendingRequests=" + pendingRequests.size() +
- ", requestCount=" + requestCount +
- ", failureCause=" + failureCause +
- '}';
- }
-
- /**
- * Pair of promise and a timestamp.
- */
- private class TimestampedCompletableFuture extends CompletableFuture<RESP> {
-
- private final long timestampInNanos;
-
- TimestampedCompletableFuture(long timestampInNanos) {
- this.timestampInNanos = timestampInNanos;
- }
-
- public long getTimestamp() {
- return timestampInNanos;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
deleted file mode 100644
index fc9b1d4..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.network.messages.RequestFailure;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.channels.ClosedChannelException;
-
-/**
- * The handler used by a {@link Client} to handling incoming messages.
- *
- * @param <REQ> the type of request the client will send.
- * @param <RESP> the type of response the client expects to receive.
- */
-@Internal
-public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
-
- private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
-
- private final String clientName;
-
- private final MessageSerializer<REQ, RESP> serializer;
-
- private final ClientHandlerCallback<RESP> callback;
-
- /**
- * Creates a handler with the callback.
- *
- * @param clientName the name of the client.
- * @param serializer the serializer used to (de-)serialize messages.
- * @param callback Callback for responses.
- */
- public ClientHandler(
- final String clientName,
- final MessageSerializer<REQ, RESP> serializer,
- final ClientHandlerCallback<RESP> callback) {
-
- this.clientName = Preconditions.checkNotNull(clientName);
- this.serializer = Preconditions.checkNotNull(serializer);
- this.callback = Preconditions.checkNotNull(callback);
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- try {
- ByteBuf buf = (ByteBuf) msg;
- MessageType msgType = MessageSerializer.deserializeHeader(buf);
-
- if (msgType == MessageType.REQUEST_RESULT) {
- long requestId = MessageSerializer.getRequestId(buf);
- RESP result = serializer.deserializeResponse(buf);
- callback.onRequestResult(requestId, result);
- } else if (msgType == MessageType.REQUEST_FAILURE) {
- RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf);
- callback.onRequestFailure(failure.getRequestId(), failure.getCause());
- } else if (msgType == MessageType.SERVER_FAILURE) {
- throw MessageSerializer.deserializeServerFailure(buf);
- } else {
- throw new IllegalStateException("Unexpected response type '" + msgType + "'");
- }
- } catch (Throwable t1) {
- try {
- callback.onFailure(t1);
- } catch (Throwable t2) {
- LOG.error("Failed to notify callback about failure", t2);
- }
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- try {
- callback.onFailure(cause);
- } catch (Throwable t) {
- LOG.error("Failed to notify callback about failure", t);
- }
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- // Only the client is expected to close the channel. Otherwise it
- // indicates a failure. Note that this will be invoked in both cases
- // though. If the callback closed the channel, the callback must be
- // ignored.
- try {
- callback.onFailure(new ClosedChannelException());
- } catch (Throwable t) {
- LOG.error("Failed to notify callback about failure", t);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
deleted file mode 100644
index 00ce1ed..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.network.messages.MessageBody;
-
-/**
- * Callback for {@link ClientHandler}.
- */
-@Internal
-public interface ClientHandlerCallback<RESP extends MessageBody> {
-
- /**
- * Called on a successful request.
- *
- * @param requestId ID of the request
- * @param response The received response
- */
- void onRequestResult(long requestId, RESP response);
-
- /**
- * Called on a failed request.
- *
- * @param requestId ID of the request
- * @param cause Cause of the request failure
- */
- void onRequestFailure(long requestId, Throwable cause);
-
- /**
- * Called on any failure, which is not related to a specific request.
- *
- * <p>This can be for example a caught Exception in the channel pipeline
- * or an unexpected channel close.
- *
- * @param cause Cause of the failure
- */
- void onFailure(Throwable cause);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
deleted file mode 100644
index f26c267..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * The base class for every message exchanged during the communication between
- * {@link org.apache.flink.queryablestate.network.Client client} and
- * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
- *
- * <p>Every such message should also have a {@link MessageDeserializer}.
- */
-@Internal
-public abstract class MessageBody {
-
- /**
- * Serializes the message into a byte array.
- * @return A byte array with the serialized content of the message.
- */
- public abstract byte[] serialize();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
deleted file mode 100644
index 436fb82..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-
-/**
- * A utility used to deserialize a {@link MessageBody message}.
- * @param <M> The type of the message to be deserialized.
- * It has to extend {@link MessageBody}
- */
-@Internal
-public interface MessageDeserializer<M extends MessageBody> {
-
- /**
- * Deserializes a message contained in a byte buffer.
- * @param buf the buffer containing the message.
- * @return The deserialized message.
- */
- M deserializeMessage(ByteBuf buf);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
deleted file mode 100644
index c0a0d32..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-
-/**
- * Serialization and deserialization of messages exchanged between
- * {@link org.apache.flink.queryablestate.network.Client client} and
- * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
- *
- * <p>The binary messages have the following format:
- *
- * <pre>
- * <------ Frame ------------------------->
- * +----------------------------------------+
- * | HEADER (8) | PAYLOAD (VAR) |
- * +------------------+----------------------------------------+
- * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
- * +------------------+----------------------------------------+
- * </pre>
- *
- * <p>The concrete content of a message depends on the {@link MessageType}.
- *
- * @param <REQ> Type of the requests of the protocol.
- * @param <RESP> Type of the responses of the protocol.
- */
-@Internal
-public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> {
-
- /** The serialization version ID. */
- private static final int VERSION = 0x79a1b710;
-
- /** Byte length of the header. */
- private static final int HEADER_LENGTH = 2 * Integer.BYTES;
-
- /** Byte length of the request id. */
- private static final int REQUEST_ID_SIZE = Long.BYTES;
-
- /** The constructor of the {@link MessageBody client requests}. Used for deserialization. */
- private final MessageDeserializer<REQ> requestDeserializer;
-
- /** The constructor of the {@link MessageBody server responses}. Used for deserialization. */
- private final MessageDeserializer<RESP> responseDeserializer;
-
- public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) {
- requestDeserializer = Preconditions.checkNotNull(requestDeser);
- responseDeserializer = Preconditions.checkNotNull(responseDeser);
- }
-
- // ------------------------------------------------------------------------
- // Serialization
- // ------------------------------------------------------------------------
-
- /**
- * Serializes the request sent to the
- * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
- *
- * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
- * @param requestId The id of the request to which the message refers to.
- * @param request The request to be serialized.
- * @return A {@link ByteBuf} containing the serialized message.
- */
- public static <REQ extends MessageBody> ByteBuf serializeRequest(
- final ByteBufAllocator alloc,
- final long requestId,
- final REQ request) {
- Preconditions.checkNotNull(request);
- return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize());
- }
-
- /**
- * Serializes the response sent to the
- * {@link org.apache.flink.queryablestate.network.Client}.
- *
- * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
- * @param requestId The id of the request to which the message refers to.
- * @param response The response to be serialized.
- * @return A {@link ByteBuf} containing the serialized message.
- */
- public static <RESP extends MessageBody> ByteBuf serializeResponse(
- final ByteBufAllocator alloc,
- final long requestId,
- final RESP response) {
- Preconditions.checkNotNull(response);
- return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize());
- }
-
- /**
- * Serializes the exception containing the failure message sent to the
- * {@link org.apache.flink.queryablestate.network.Client} in case of
- * protocol related errors.
- *
- * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
- * @param requestId The id of the request to which the message refers to.
- * @param cause The exception thrown at the server.
- * @return A {@link ByteBuf} containing the serialized message.
- */
- public static ByteBuf serializeRequestFailure(
- final ByteBufAllocator alloc,
- final long requestId,
- final Throwable cause) throws IOException {
-
- final ByteBuf buf = alloc.ioBuffer();
-
- // Frame length is set at the end
- buf.writeInt(0);
- writeHeader(buf, MessageType.REQUEST_FAILURE);
- buf.writeLong(requestId);
-
- try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
- ObjectOutput out = new ObjectOutputStream(bbos)) {
- out.writeObject(cause);
- }
-
- // Set frame length
- int frameLength = buf.readableBytes() - Integer.BYTES;
- buf.setInt(0, frameLength);
- return buf;
- }
-
- /**
- * Serializes the failure message sent to the
- * {@link org.apache.flink.queryablestate.network.Client} in case of
- * server related errors.
- *
- * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
- * @param cause The exception thrown at the server.
- * @return The failure message.
- */
- public static ByteBuf serializeServerFailure(
- final ByteBufAllocator alloc,
- final Throwable cause) throws IOException {
-
- final ByteBuf buf = alloc.ioBuffer();
-
- // Frame length is set at end
- buf.writeInt(0);
- writeHeader(buf, MessageType.SERVER_FAILURE);
-
- try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
- ObjectOutput out = new ObjectOutputStream(bbos)) {
- out.writeObject(cause);
- }
-
- // Set frame length
- int frameLength = buf.readableBytes() - Integer.BYTES;
- buf.setInt(0, frameLength);
- return buf;
- }
-
- /**
- * Helper for serializing the header.
- *
- * @param buf The {@link ByteBuf} to serialize the header into.
- * @param messageType The {@link MessageType} of the message this header refers to.
- */
- private static void writeHeader(final ByteBuf buf, final MessageType messageType) {
- buf.writeInt(VERSION);
- buf.writeInt(messageType.ordinal());
- }
-
- /**
- * Helper for serializing the messages.
- *
- * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into.
- * @param requestId The id of the request to which the message refers to.
- * @param messageType The {@link MessageType type of the message}.
- * @param payload The serialized version of the message.
- * @return A {@link ByteBuf} containing the serialized message.
- */
- private static ByteBuf writePayload(
- final ByteBufAllocator alloc,
- final long requestId,
- final MessageType messageType,
- final byte[] payload) {
-
- final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length;
- final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES);
-
- buf.writeInt(frameLength);
- writeHeader(buf, messageType);
- buf.writeLong(requestId);
- buf.writeBytes(payload);
- return buf;
- }
-
- // ------------------------------------------------------------------------
- // Deserialization
- // ------------------------------------------------------------------------
-
- /**
- * De-serializes the header and returns the {@link MessageType}.
- * <pre>
- * <b>The buffer is expected to be at the header position.</b>
- * </pre>
- * @param buf The {@link ByteBuf} containing the serialized header.
- * @return The message type.
- * @throws IllegalStateException If unexpected message version or message type.
- */
- public static MessageType deserializeHeader(final ByteBuf buf) {
-
- // checking the version
- int version = buf.readInt();
- Preconditions.checkState(version == VERSION,
- "Version Mismatch: Found " + version + ", Expected: " + VERSION + '.');
-
- // fetching the message type
- int msgType = buf.readInt();
- MessageType[] values = MessageType.values();
- Preconditions.checkState(msgType >= 0 && msgType < values.length,
- "Illegal message type with index " + msgType + '.');
- return values[msgType];
- }
-
- /**
- * De-serializes the header and returns the {@link MessageType}.
- * <pre>
- * <b>The buffer is expected to be at the request id position.</b>
- * </pre>
- * @param buf The {@link ByteBuf} containing the serialized request id.
- * @return The request id.
- */
- public static long getRequestId(final ByteBuf buf) {
- return buf.readLong();
- }
-
- /**
- * De-serializes the request sent to the
- * {@link org.apache.flink.queryablestate.network.AbstractServerBase}.
- * <pre>
- * <b>The buffer is expected to be at the request position.</b>
- * </pre>
- * @param buf The {@link ByteBuf} containing the serialized request.
- * @return The request.
- */
- public REQ deserializeRequest(final ByteBuf buf) {
- Preconditions.checkNotNull(buf);
- return requestDeserializer.deserializeMessage(buf);
- }
-
- /**
- * De-serializes the response sent to the
- * {@link org.apache.flink.queryablestate.network.Client}.
- * <pre>
- * <b>The buffer is expected to be at the response position.</b>
- * </pre>
- * @param buf The {@link ByteBuf} containing the serialized response.
- * @return The response.
- */
- public RESP deserializeResponse(final ByteBuf buf) {
- Preconditions.checkNotNull(buf);
- return responseDeserializer.deserializeMessage(buf);
- }
-
- /**
- * De-serializes the {@link RequestFailure} sent to the
- * {@link org.apache.flink.queryablestate.network.Client} in case of
- * protocol related errors.
- * <pre>
- * <b>The buffer is expected to be at the correct position.</b>
- * </pre>
- * @param buf The {@link ByteBuf} containing the serialized failure message.
- * @return The failure message.
- */
- public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
- long requestId = buf.readLong();
-
- Throwable cause;
- try (ByteBufInputStream bis = new ByteBufInputStream(buf);
- ObjectInputStream in = new ObjectInputStream(bis)) {
- cause = (Throwable) in.readObject();
- }
- return new RequestFailure(requestId, cause);
- }
-
- /**
- * De-serializes the failure message sent to the
- * {@link org.apache.flink.queryablestate.network.Client} in case of
- * server related errors.
- * <pre>
- * <b>The buffer is expected to be at the correct position.</b>
- * </pre>
- * @param buf The {@link ByteBuf} containing the serialized failure message.
- * @return The failure message.
- */
- public static Throwable deserializeServerFailure(final ByteBuf buf) throws IOException, ClassNotFoundException {
- try (ByteBufInputStream bis = new ByteBufInputStream(buf);
- ObjectInputStream in = new ObjectInputStream(bis)) {
- return (Throwable) in.readObject();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
deleted file mode 100644
index 562ce93..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Expected message types during the communication between
- * {@link org.apache.flink.queryablestate.network.Client client} and
- * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}.
- */
-@Internal
-public enum MessageType {
-
- /** The message is a request. */
- REQUEST,
-
- /** The message is a successful response. */
- REQUEST_RESULT,
-
- /** The message indicates a protocol-related failure. */
- REQUEST_FAILURE,
-
- /** The message indicates a server failure. */
- SERVER_FAILURE
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
deleted file mode 100644
index 106199f..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network.messages;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * A message indicating a protocol-related error.
- */
-@Internal
-public class RequestFailure {
-
- /** ID of the request responding to. */
- private final long requestId;
-
- /** Failure cause. Not allowed to be a user type. */
- private final Throwable cause;
-
- /**
- * Creates a failure response to a {@link MessageBody}.
- *
- * @param requestId ID for the request responding to
- * @param cause Failure cause (not allowed to be a user type)
- */
- public RequestFailure(long requestId, Throwable cause) {
- this.requestId = requestId;
- this.cause = cause;
- }
-
- /**
- * Returns the request ID responding to.
- *
- * @return Request ID responding to
- */
- public long getRequestId() {
- return requestId;
- }
-
- /**
- * Returns the failure cause.
- *
- * @return Failure cause
- */
- public Throwable getCause() {
- return cause;
- }
-
- @Override
- public String toString() {
- return "RequestFailure{" +
- "requestId=" + requestId +
- ", cause=" + cause +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
deleted file mode 100644
index 055a5d0..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.server;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
-import org.apache.flink.queryablestate.UnknownKvStateIdException;
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.AbstractServerHandler;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * This handler dispatches asynchronous tasks, which query {@link InternalKvState}
- * instances and write the result to the channel.
- *
- * <p>The network threads receive the message, deserialize it and dispatch the
- * query task. The actual query is handled in a separate thread as it might
- * otherwise block the network threads (file I/O etc.).
- */
-@Internal
-@ChannelHandler.Sharable
-public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
-
- private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
-
- /** KvState registry holding references to the KvState instances. */
- private final KvStateRegistry registry;
-
- /**
- * Create the handler used by the {@link KvStateServerImpl}.
- *
- * @param server the {@link KvStateServerImpl} using the handler.
- * @param kvStateRegistry registry to query.
- * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
- * @param stats server statistics collector.
- */
- public KvStateServerHandler(
- final KvStateServerImpl server,
- final KvStateRegistry kvStateRegistry,
- final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
- final KvStateRequestStats stats) {
-
- super(server, serializer, stats);
- this.registry = Preconditions.checkNotNull(kvStateRegistry);
- }
-
- @Override
- public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) {
- final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
-
- try {
- final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
- if (kvState == null) {
- responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
- } else {
- byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
-
- byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
- if (serializedResult != null) {
- responseFuture.complete(new KvStateResponse(serializedResult));
- } else {
- responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName()));
- }
- }
- return responseFuture;
- } catch (Throwable t) {
- String errMsg = "Error while processing request with ID " + requestId +
- ". Caused by: " + ExceptionUtils.stringifyException(t);
- responseFuture.completeExceptionally(new RuntimeException(errMsg));
- return responseFuture;
- }
- }
-
- @Override
- public void shutdown() {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
deleted file mode 100644
index dfca915..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.server;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.AbstractServerBase;
-import org.apache.flink.queryablestate.network.AbstractServerHandler;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.Iterator;
-
-/**
- * The default implementation of the {@link KvStateServer}.
- */
-@Internal
-public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
-
- private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
-
- /** The {@link KvStateRegistry} to query for state instances. */
- private final KvStateRegistry kvStateRegistry;
-
- private final KvStateRequestStats stats;
-
- private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;
-
- /**
- * Creates the state server.
- *
- * <p>The server is instantiated using reflection by the
- * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)
- * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)}.
- *
- * <p>The server needs to be started via {@link #start()} in order to bind
- * to the configured bind address.
- *
- * @param bindAddress the address to listen to.
- * @param bindPortIterator the port range to try to bind to.
- * @param numEventLoopThreads number of event loop threads.
- * @param numQueryThreads number of query threads.
- * @param kvStateRegistry {@link KvStateRegistry} to query for state instances.
- * @param stats the statistics collector.
- */
- public KvStateServerImpl(
- final InetAddress bindAddress,
- final Iterator<Integer> bindPortIterator,
- final Integer numEventLoopThreads,
- final Integer numQueryThreads,
- final KvStateRegistry kvStateRegistry,
- final KvStateRequestStats stats) {
-
- super("Queryable State Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads);
- this.stats = Preconditions.checkNotNull(stats);
- this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
- }
-
- @Override
- public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() {
- this.serializer = new MessageSerializer<>(
- new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
- new KvStateResponse.KvStateResponseDeserializer());
- return new KvStateServerHandler(this, kvStateRegistry, serializer, stats);
- }
-
- public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() {
- Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started.");
- return serializer;
- }
-
- @Override
- public void start() throws Throwable {
- super.start();
- }
-
- @Override
- public KvStateServerAddress getServerAddress() {
- return super.getServerAddress();
- }
-
- @Override
- public void shutdown() {
- super.shutdown();
- }
-}