You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2021/11/25 22:21:28 UTC
[drill] branch master updated: DRILL-8009: DrillConnectionImpl#isValid() doesn't correspond JDBC API
This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 5fb79a1 DRILL-8009: DrillConnectionImpl#isValid() doesn't correspond JDBC API
5fb79a1 is described below
commit 5fb79a17b14c59ef9114e4306bfdbabf315a591b
Author: Maksym Rymar <ri...@gmail.com>
AuthorDate: Wed Oct 13 18:45:31 2021 +0300
DRILL-8009: DrillConnectionImpl#isValid() doesn't correspond JDBC API
---
.../org/apache/drill/exec/client/DrillClient.java | 22 ++
.../drill/jdbc/impl/DrillConnectionImpl.java | 20 +-
.../org/apache/drill/jdbc/impl/DrillHandler.java | 2 -
...rill2489CallsAfterCloseThrowExceptionsTest.java | 11 +-
.../org/apache/drill/exec/rpc/BasicClient.java | 274 ++++++++++++++-------
.../java/org/apache/drill/exec/rpc/RpcBus.java | 24 +-
.../org/apache/drill/exec/rpc/RpcConstants.java | 1 +
7 files changed, 240 insertions(+), 114 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 49375c9..1e84608 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -40,6 +40,7 @@ import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.Version;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -49,6 +50,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -826,6 +828,26 @@ public class DrillClient implements Closeable, ConnectionThrottle {
client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
}
+ /**
+ * @return true if client has connection and it is connected, false otherwise
+ */
+ public boolean connectionIsActive() {
+ return client.isActive();
+ }
+
+ /**
+ * Verify connection with request-answer.
+ *
+ * @param timeoutSec time in seconds to wait answer receiving. If 0 then won't wait.
+ * @return true if {@link GeneralRPCProtos.RpcMode#PONG PONG} received until timeout, false otherwise
+ */
+ public boolean hasPing(long timeoutSec) throws DrillRuntimeException {
+ if (timeoutSec < 0) {
+ timeoutSec = 0;
+ }
+ return client.hasPing(timeoutSec);
+ }
+
private class ListHoldingResultsListener implements UserResultsListener {
private final Vector<QueryDataBatch> results = new Vector<>();
private final SettableFuture<List<QueryDataBatch>> future = SettableFuture.create();
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
index 714a3ec..63f2f77 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
@@ -168,7 +168,7 @@ public class DrillConnectionImpl extends AvaticaConnection implements DrillConne
throw new SQLException("Failure in creating DrillConnectionImpl: " + e, e);
}
} catch (Throwable t) {
- cleanup();
+ close();
throw t;
}
}
@@ -248,7 +248,11 @@ public class DrillConnectionImpl extends AvaticaConnection implements DrillConne
@Override
public boolean isClosed() {
try {
- return super.isClosed();
+ if (super.isClosed()) {
+ return true;
+ } else {
+ return client != null ? !client.connectionIsActive() : false;
+ }
} catch (SQLException e) {
// Currently can't happen, since AvaticaConnection.isClosed() never throws
// SQLException.
@@ -520,8 +524,11 @@ public class DrillConnectionImpl extends AvaticaConnection implements DrillConne
@Override
public boolean isValid(int timeout) throws SQLException {
- checkOpen();
- return super.isValid(timeout);
+ if (timeout < 0) {
+ throw new SQLException(String.format("Invalid timeout (%d<0).", timeout));
+ }
+ return !isClosed()
+ && client.hasPing(timeout);
}
@Override
@@ -625,8 +632,9 @@ public class DrillConnectionImpl extends AvaticaConnection implements DrillConne
}
}
- // TODO this should be an AutoCloseable, and this should be close()
- void cleanup() {
+ @Override
+ public void close() throws SQLException {
+ super.close();
// First close any open JDBC Statement objects, to close any open ResultSet
// objects and release their buffers/vectors.
openStatementsRegistry.close();
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillHandler.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillHandler.java
index 84fed2b..7b84be8 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillHandler.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillHandler.java
@@ -30,8 +30,6 @@ public class DrillHandler implements Handler {
@Override
public void onConnectionClose(AvaticaConnection c) throws RuntimeException {
- DrillConnectionImpl connection = (DrillConnectionImpl) c;
- connection.cleanup();
}
@Override
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
index 657c86d..e18ad7f 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
@@ -294,9 +294,14 @@ public class Drill2489CallsAfterCloseThrowExceptionsTest extends JdbcTestBase {
* Reports whether it's okay if given method didn't throw any exception.
*/
protected boolean isOkayNonthrowingMethod(Method method) {
- return
- "isClosed".equals(method.getName())
- || "close".equals(method.getName());
+ switch (method.getName()) {
+ case "isClosed":
+ case "close":
+ case "isValid":
+ return true;
+ default:
+ return false;
+ }
}
/**
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 30be9c7..bd4c0ef 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.rpc;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
@@ -26,6 +25,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -34,30 +34,34 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.hadoop.security.UserGroupInformation;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.TimeUnit;
/**
- *
- * @param <T> handshake rpc type
+ * @param <T> handshake rpc type
* @param <CC> Client connection type
* @param <HS> Handshake send type
* @param <HR> Handshake receive type
*/
public abstract class BasicClient<T extends EnumLite, CC extends ClientConnection,
- HS extends MessageLite, HR extends MessageLite>
- extends RpcBus<T, CC> {
+ HS extends MessageLite, HR extends MessageLite>
+ extends RpcBus<T, CC> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
// The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
@@ -71,7 +75,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
private final Class<HR> responseClass;
private final Parser<HR> handshakeParser;
- private final IdlePingHandler pingHandler;
+ private HeartBeatHandler heartBeatHandler;
private ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener = null;
// Determines if authentication is completed between client and server
@@ -83,48 +87,48 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
this.responseClass = responseClass;
this.handshakeType = handshakeType;
this.handshakeParser = handshakeParser;
- final long timeoutInMillis = rpcMapping.hasTimeout() ?
- (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING) :
- -1;
- this.pingHandler = rpcMapping.hasTimeout() ? new IdlePingHandler(timeoutInMillis) : null;
+ final int readIdleSec = rpcMapping.hasTimeout() ?
+ (int) (rpcMapping.getTimeout() * PERCENT_TIMEOUT_BEFORE_SENDING_PING) : -1;
+ IdleStateHandler idleStateHandler = rpcMapping.hasTimeout() ? new IdleStateHandler(readIdleSec, 0, 0) : null;
+
+ final int heartbeatWaitSec = rpcMapping.hasTimeout() ? rpcMapping.getTimeout() - readIdleSec : -1;
+ HeartBeatHandler heartBeatHandler = this.heartBeatHandler = new HeartBeatHandler(heartbeatWaitSec);
b = new Bootstrap() //
- .group(eventLoopGroup) //
- .channel(TransportCheck.getClientSocketChannel()) //
- .option(ChannelOption.ALLOCATOR, alloc) //
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
- .option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.SO_RCVBUF, 1 << 17) //
- .option(ChannelOption.SO_SNDBUF, 1 << 17) //
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // logger.debug("initializing client connection.");
- connection = initRemoteConnection(ch);
-
- ch.closeFuture().addListener(getCloseHandler(ch, connection));
-
- final ChannelPipeline pipe = ch.pipeline();
- // Make sure that the SSL handler is the first handler in the pipeline so everything is encrypted
- if (isSslEnabled()) {
- setupSSL(pipe, sslHandshakeListener);
- }
-
- pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator()));
- pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("c-" + rpcConfig.getName()));
- pipe.addLast(RpcConstants.PROTOCOL_ENCODER, new RpcEncoder("c-" + rpcConfig.getName()));
- pipe.addLast(RpcConstants.HANDSHAKE_HANDLER, new ClientHandshakeHandler(connection));
-
- if(pingHandler != null){
- pipe.addLast(RpcConstants.IDLE_STATE_HANDLER, pingHandler);
- }
-
- pipe.addLast(RpcConstants.MESSAGE_HANDLER, new InboundHandler(connection));
- pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<>(connection));
+ .group(eventLoopGroup) //
+ .channel(TransportCheck.getClientSocketChannel()) //
+ .option(ChannelOption.ALLOCATOR, alloc) //
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_RCVBUF, 1 << 17) //
+ .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+ .option(ChannelOption.TCP_NODELAY, true)
+ .handler(new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ // logger.debug("initializing client connection.");
+ connection = initRemoteConnection(ch);
+
+ ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+ final ChannelPipeline pipe = ch.pipeline();
+ // Make sure that the SSL handler is the first handler in the pipeline so everything is encrypted
+ if (isSslEnabled()) {
+ setupSSL(pipe, sslHandshakeListener);
+ }
+ if (idleStateHandler != null) {
+ pipe.addLast(RpcConstants.IDLE_STATE_HANDLER, idleStateHandler);
}
- }); //
+ pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator()));
+ pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("c-" + rpcConfig.getName()));
+ pipe.addLast(RpcConstants.PROTOCOL_ENCODER, new RpcEncoder("c-" + rpcConfig.getName()));
+ pipe.addLast(RpcConstants.HANDSHAKE_HANDLER, new ClientHandshakeHandler(connection));
+ pipe.addLast(RpcConstants.MESSAGE_HANDLER, new InboundHandler(connection));
+ pipe.addLast(RpcConstants.HEARTBEAT_HANDLER, heartBeatHandler);
+ pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<>(connection));
+ }
+ }); //
// if(TransportCheck.SUPPORTS_EPOLL){
// b.option(EpollChannelOption.SO_REUSEPORT, true); //
@@ -143,6 +147,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
/**
* Set's the state for authentication complete.
+ *
* @param authComplete - state to set. True means authentication between client and server is completed, false
* means authentication is in progress.
*/
@@ -160,38 +165,125 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
}
@Override
- protected CC initRemoteConnection(SocketChannel channel){
- local=channel.localAddress();
- remote=channel.remoteAddress();
+ protected CC initRemoteConnection(SocketChannel channel) {
+ local = channel.localAddress();
+ remote = channel.remoteAddress();
return null;
}
- private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
-
/**
- * Handler that watches for situations where we haven't read from the socket in a certain timeout. If we exceed this
- * timeout, we send a PING message to the server to state that we are still alive.
+ * Handler watches for {@link IdleState#READER_IDLE IdleState.READER_IDLE} user event and sends message with
+ * {@link RpcMode#PING RpcMode.PING} to the server and waits for the {@link RpcMode#PONG RpcMode.PONG} answer.
+ * The handler watches for {@link RpcMode#PONG RpcMode.PONG} user event from
+ * {@link org.apache.drill.exec.rpc.RpcBus.InboundHandler} as a signal that the answer is received. If it is not received
+ * until answerWaitSec timeout, than the handler closes the connection.
*/
- private class IdlePingHandler extends IdleStateHandler {
+ private class HeartBeatHandler extends ChannelInboundHandlerAdapter {
+ private final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
+ private final int answerWaitSec;
+ private final Queue<Pair<Promise<Boolean>, ScheduledFuture>> pongFutures = new LinkedList<>();
+ private ChannelHandlerContext ctx;
+
+ /**
+ * @param answerWaitSec timeout in seconds to wait an answer from the server
+ */
+ public HeartBeatHandler(int answerWaitSec) {
+ this.answerWaitSec = answerWaitSec;
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ this.ctx = ctx;
+ super.handlerAdded(ctx);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent idleState = (IdleStateEvent) evt;
+ if (idleState.state() == IdleState.READER_IDLE) {
+ idleEvent();
+ }
+ } else if (evt instanceof RpcMode) {
+ RpcMode rpcMode = (RpcMode) evt;
+ if (rpcMode == RpcMode.PONG) {
+ pongReceived();
+ }
+ }
+ ctx.fireUserEventTriggered(evt);
+ }
+
+ private void idleEvent() {
+ EventExecutor executor = ctx.executor();
+
+ Promise<Boolean> pongReceived = executor.newPromise();
+ ScheduledFuture<?> pongTimeoutChecker = null;
+
+ if (answerWaitSec > 0) {
+ pongTimeoutChecker = executor.schedule(() -> {
+ if (!pongReceived.isSuccess()) {
+ logger.error("Unable to get an answer from the server. Timeout: {} seconds. Connection: {}. " +
+ "Closing connection.", answerWaitSec, connection.getName());
+ connection.close();
+ }
+ }, answerWaitSec, TimeUnit.SECONDS);
+ }
+ pongFutures.add(Pair.of(pongReceived, pongTimeoutChecker));
+
+ sendPing();
+ }
- private GenericFutureListener<Future<? super Void>> pingFailedHandler = new GenericFutureListener<Future<? super Void>>() {
- public void operationComplete(Future<? super Void> future) throws Exception {
+ private void sendPing() {
+ ctx.channel().writeAndFlush(PING_MESSAGE).addListener(future -> {
if (!future.isSuccess()) {
logger.error("Unable to maintain connection {}. Closing connection.", connection.getName());
- connection.close();
+ close();
+ }
+ });
+ }
+
+ private void pongReceived() {
+ Pair<Promise<Boolean>, ScheduledFuture> pongFuture = pongFutures.poll();
+ if (pongFuture != null) {
+ Promise<Boolean> pongReceived = pongFuture.getLeft();
+ pongReceived.setSuccess(true);
+ ScheduledFuture pongTimeoutChecker = pongFuture.getRight();
+ if (pongTimeoutChecker != null) {
+ pongTimeoutChecker.cancel(false);
}
}
- };
+ }
- IdlePingHandler(long idleWaitInMillis) {
- super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS);
+ public Promise<Boolean> demandHeartbeat() {
+ EventExecutor executor = this.ctx.executor();
+ Promise<Boolean> pongReceived = executor.newPromise();
+ pongFutures.add(Pair.of(pongReceived, null));
+ sendPing();
+ return pongReceived;
}
+ }
- @Override
- protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
- if (evt.state() == IdleState.WRITER_IDLE) {
- ctx.writeAndFlush(PING_MESSAGE).addListener(pingFailedHandler);
- }
+ /**
+ * Sends request and waits for answer to verify connection.
+ *
+ * @param timeoutSec time in seconds to wait message receiving. If 0 then won't wait.
+ * @return true if answer received until timeout, false otherwise
+ */
+ public boolean hasPing(long timeoutSec) {
+ if (timeoutSec < 0) {
+ timeoutSec = 0;
+ }
+ try {
+ return heartBeatHandler.demandHeartbeat().await(timeoutSec, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("Heartbeat wait was interrupted.");
+
+ // Preserve evidence that the interruption occurred so that code higher up
+ // on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
+ return false;
}
}
@@ -206,7 +298,8 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
/**
* Creates various instances needed to start the SASL handshake. This is called from
* {@link BasicClient#validateHandshake(MessageLite)} if authentication is required from server side.
- * @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
+ *
+ * @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
* @param serverAuthMechanisms - List of auth mechanisms configured on server side
*/
protected abstract void prepareSaslHandshake(final RpcConnectionHandler<CC> connectionHandler,
@@ -217,11 +310,12 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
* after regular RPC handshake that authentication is required by server side. Once authentication is completed
* then only the underlying channel is made available to clients to send other RPC messages. Success and failure
* events are notified to the connection handler on which client waits.
+ *
* @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
- * @param saslProperties - SASL related properties needed to create SASL client.
- * @param ugi - UserGroupInformation with logged in client side user
- * @param authFactory - Authentication factory to use for this SASL handshake.
- * @param rpcType - SASL_MESSAGE rpc type.
+ * @param saslProperties - SASL related properties needed to create SASL client.
+ * @param ugi - UserGroupInformation with logged in client side user
+ * @param authFactory - Authentication factory to use for this SASL handshake.
+ * @param rpcType - SASL_MESSAGE rpc type.
*/
protected void startSaslHandshake(final RpcConnectionHandler<CC> connectionHandler,
Map<String, ?> saslProperties, UserGroupInformation ugi,
@@ -245,22 +339,22 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
logger.debug("Initiating SASL exchange.");
new AuthenticationOutcomeListener<>(this, connection, rpcType, ugi,
new RpcOutcomeListener<Void>() {
- @Override
- public void failed(RpcException ex) {
- connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
- }
+ @Override
+ public void failed(RpcException ex) {
+ connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+ }
- @Override
- public void success(Void value, ByteBuf buffer) {
- authComplete = true;
- connectionHandler.connectionSucceeded(connection);
- }
+ @Override
+ public void success(Void value, ByteBuf buffer) {
+ authComplete = true;
+ connectionHandler.connectionSucceeded(connection);
+ }
- @Override
- public void interrupted(InterruptedException ex) {
- connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
- }
- }).initiate(mechanismName);
+ @Override
+ public void interrupted(InterruptedException ex) {
+ connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+ }
+ }).initiate(mechanismName);
}
protected void finalizeConnection(HR handshake, CC connection) {
@@ -280,16 +374,16 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
public <SEND extends MessageLite, RECEIVE extends MessageLite>
void send(RpcOutcomeListener<RECEIVE> listener, SEND protobufBody, boolean allowInEventLoop,
- ByteBuf... dataBodies) {
+ ByteBuf... dataBodies) {
super.send(listener, connection, handshakeType, protobufBody, (Class<RECEIVE>) responseClass,
- allowInEventLoop, dataBodies);
+ allowInEventLoop, dataBodies);
}
protected void connectAsClient(RpcConnectionHandler<CC> connectionListener, HS handshakeValue,
String host, int port) {
ConnectionMultiListener<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> cml;
- ConnectionMultiListener.Builder<T, CC, HS, HR, BasicClient<T, CC, HS, HR> > builder =
- ConnectionMultiListener.newBuilder(connectionListener, handshakeValue, this);
+ ConnectionMultiListener.Builder<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> builder =
+ ConnectionMultiListener.newBuilder(connectionListener, handshakeValue, this);
if (isSslEnabled()) {
cml = builder.enableSSL().build();
sslHandshakeListener = new ConnectionMultiListener.SSLHandshakeListener();
@@ -314,7 +408,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
protected final void consumeHandshake(ChannelHandlerContext ctx, HR msg) throws Exception {
// remove the handshake information from the queue so it doesn't sit there forever.
final RpcOutcome<HR> response =
- connection.getAndRemoveRpcOutcome(handshakeType.getNumber(), coordinationId, responseClass);
+ connection.getAndRemoveRpcOutcome(handshakeType.getNumber(), coordinationId, responseClass);
response.set(msg, null);
}
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 944e854..c296f39 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.exec.rpc;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.Channel;
@@ -26,6 +30,11 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import java.io.Closeable;
import java.net.SocketAddress;
@@ -34,17 +43,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
/**
* The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a
* system.
@@ -252,7 +250,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
@Override
protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output)
- throws Exception {
+ throws Exception {
if (!ctx.channel().isOpen()) {
return;
}
@@ -318,7 +316,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
break;
case PONG:
- // noop.
+ ctx.fireUserEventTriggered(RpcMode.PONG);
break;
default:
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
index 455ada8..4d9db54 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
@@ -34,6 +34,7 @@ public class RpcConstants {
public static final String MESSAGE_HANDLER = "message-handler";
public static final String EXCEPTION_HANDLER = "exception-handler";
public static final String IDLE_STATE_HANDLER = "idle-state-handler";
+ public static final String HEARTBEAT_HANDLER = "heartbeat-handler";
public static final String SASL_DECRYPTION_HANDLER = "sasl-decryption-handler";
public static final String SASL_ENCRYPTION_HANDLER = "sasl-encryption-handler";
public static final String LENGTH_DECODER_HANDLER = "length-decoder";