You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2021/11/25 22:21:28 UTC

[drill] branch master updated: DRILL-8009: DrillConnectionImpl#isValid() doesn't correspond JDBC API

This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fb79a1  DRILL-8009: DrillConnectionImpl#isValid() doesn't correspond JDBC API
5fb79a1 is described below

commit 5fb79a17b14c59ef9114e4306bfdbabf315a591b
Author: Maksym Rymar <ri...@gmail.com>
AuthorDate: Wed Oct 13 18:45:31 2021 +0300

    DRILL-8009: DrillConnectionImpl#isValid() doesn't correspond JDBC API
---
 .../org/apache/drill/exec/client/DrillClient.java  |  22 ++
 .../drill/jdbc/impl/DrillConnectionImpl.java       |  20 +-
 .../org/apache/drill/jdbc/impl/DrillHandler.java   |   2 -
 ...rill2489CallsAfterCloseThrowExceptionsTest.java |  11 +-
 .../org/apache/drill/exec/rpc/BasicClient.java     | 274 ++++++++++++++-------
 .../java/org/apache/drill/exec/rpc/RpcBus.java     |  24 +-
 .../org/apache/drill/exec/rpc/RpcConstants.java    |   1 +
 7 files changed, 240 insertions(+), 114 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 49375c9..1e84608 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -40,6 +40,7 @@ import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.Version;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -49,6 +50,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -826,6 +828,26 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
   }
 
+  /**
+   * @return true if client has connection and it is connected, false otherwise
+   */
+  public boolean connectionIsActive() {
+    return client.isActive();
+  }
+
+  /**
+   * Verify connection with request-answer.
+   *
+   * @param timeoutSec time in seconds to wait answer receiving. If 0 then won't wait.
+   * @return true if {@link GeneralRPCProtos.RpcMode#PONG PONG} received until timeout, false otherwise
+   */
+  public boolean hasPing(long timeoutSec) throws DrillRuntimeException {
+    if (timeoutSec < 0) {
+      timeoutSec = 0;
+    }
+    return client.hasPing(timeoutSec);
+  }
+
   private class ListHoldingResultsListener implements UserResultsListener {
     private final Vector<QueryDataBatch> results = new Vector<>();
     private final SettableFuture<List<QueryDataBatch>> future = SettableFuture.create();
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
index 714a3ec..63f2f77 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
@@ -168,7 +168,7 @@ public class DrillConnectionImpl extends AvaticaConnection implements DrillConne
         throw new SQLException("Failure in creating DrillConnectionImpl: " + e, e);
       }
     } catch (Throwable t) {
-      cleanup();
+      close();
       throw t;
     }
   }
@@ -248,7 +248,11 @@ public class DrillConnectionImpl extends AvaticaConnection implements DrillConne
   @Override
   public boolean isClosed() {
     try {
-      return super.isClosed();
+      if (super.isClosed()) {
+        return true;
+      } else {
+        return client != null ? !client.connectionIsActive() : false;
+      }
     } catch (SQLException e) {
       // Currently can't happen, since AvaticaConnection.isClosed() never throws
       // SQLException.
@@ -520,8 +524,11 @@ public class DrillConnectionImpl extends AvaticaConnection implements DrillConne
 
   @Override
   public boolean isValid(int timeout) throws SQLException {
-    checkOpen();
-    return super.isValid(timeout);
+    if (timeout < 0) {
+      throw new SQLException(String.format("Invalid timeout (%d<0).", timeout));
+    }
+    return !isClosed()
+      && client.hasPing(timeout);
   }
 
   @Override
@@ -625,8 +632,9 @@ public class DrillConnectionImpl extends AvaticaConnection implements DrillConne
     }
   }
 
-  // TODO this should be an AutoCloseable, and this should be close()
-  void cleanup() {
+  @Override
+  public void close() throws SQLException {
+    super.close();
     // First close any open JDBC Statement objects, to close any open ResultSet
     // objects and release their buffers/vectors.
     openStatementsRegistry.close();
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillHandler.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillHandler.java
index 84fed2b..7b84be8 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillHandler.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillHandler.java
@@ -30,8 +30,6 @@ public class DrillHandler implements Handler {
 
   @Override
   public void onConnectionClose(AvaticaConnection c) throws RuntimeException {
-    DrillConnectionImpl connection = (DrillConnectionImpl) c;
-    connection.cleanup();
   }
 
   @Override
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
index 657c86d..e18ad7f 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
@@ -294,9 +294,14 @@ public class Drill2489CallsAfterCloseThrowExceptionsTest extends JdbcTestBase {
      * Reports whether it's okay if given method didn't throw any exception.
      */
     protected boolean isOkayNonthrowingMethod(Method method) {
-       return
-           "isClosed".equals(method.getName())
-           || "close".equals(method.getName());
+       switch (method.getName()) {
+         case "isClosed":
+         case "close":
+         case "isValid":
+           return true;
+         default:
+           return false;
+       }
     }
 
     /**
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 30be9c7..bd4c0ef 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.rpc;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
@@ -26,6 +25,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -34,30 +34,34 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
 import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
 /**
- *
- * @param <T> handshake rpc type
+ * @param <T>  handshake rpc type
  * @param <CC> Client connection type
  * @param <HS> Handshake send type
  * @param <HR> Handshake receive type
  */
 public abstract class BasicClient<T extends EnumLite, CC extends ClientConnection,
-                                  HS extends MessageLite, HR extends MessageLite>
-    extends RpcBus<T, CC> {
+  HS extends MessageLite, HR extends MessageLite>
+  extends RpcBus<T, CC> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
   // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
@@ -71,7 +75,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
   private final Class<HR> responseClass;
   private final Parser<HR> handshakeParser;
 
-  private final IdlePingHandler pingHandler;
+  private HeartBeatHandler heartBeatHandler;
   private ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener = null;
 
   // Determines if authentication is completed between client and server
@@ -83,48 +87,48 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
     this.responseClass = responseClass;
     this.handshakeType = handshakeType;
     this.handshakeParser = handshakeParser;
-    final long timeoutInMillis = rpcMapping.hasTimeout() ?
-        (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING) :
-        -1;
-    this.pingHandler = rpcMapping.hasTimeout() ? new IdlePingHandler(timeoutInMillis) : null;
+    final int readIdleSec = rpcMapping.hasTimeout() ?
+            (int) (rpcMapping.getTimeout() * PERCENT_TIMEOUT_BEFORE_SENDING_PING) : -1;
+    IdleStateHandler idleStateHandler = rpcMapping.hasTimeout() ? new IdleStateHandler(readIdleSec, 0, 0) : null;
+
+    final int heartbeatWaitSec = rpcMapping.hasTimeout() ? rpcMapping.getTimeout() - readIdleSec : -1;
+    HeartBeatHandler heartBeatHandler = this.heartBeatHandler = new HeartBeatHandler(heartbeatWaitSec);
 
     b = new Bootstrap() //
-        .group(eventLoopGroup) //
-        .channel(TransportCheck.getClientSocketChannel()) //
-        .option(ChannelOption.ALLOCATOR, alloc) //
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
-        .option(ChannelOption.SO_REUSEADDR, true)
-        .option(ChannelOption.SO_RCVBUF, 1 << 17) //
-        .option(ChannelOption.SO_SNDBUF, 1 << 17) //
-        .option(ChannelOption.TCP_NODELAY, true)
-        .handler(new ChannelInitializer<SocketChannel>() {
-
-          @Override
-          protected void initChannel(SocketChannel ch) throws Exception {
-            // logger.debug("initializing client connection.");
-            connection = initRemoteConnection(ch);
-
-            ch.closeFuture().addListener(getCloseHandler(ch, connection));
-
-            final ChannelPipeline pipe = ch.pipeline();
-            // Make sure that the SSL handler is the first handler in the pipeline so everything is encrypted
-            if (isSslEnabled()) {
-              setupSSL(pipe, sslHandshakeListener);
-            }
-
-            pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator()));
-            pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("c-" + rpcConfig.getName()));
-            pipe.addLast(RpcConstants.PROTOCOL_ENCODER, new RpcEncoder("c-" + rpcConfig.getName()));
-            pipe.addLast(RpcConstants.HANDSHAKE_HANDLER, new ClientHandshakeHandler(connection));
-
-            if(pingHandler != null){
-              pipe.addLast(RpcConstants.IDLE_STATE_HANDLER, pingHandler);
-            }
-
-            pipe.addLast(RpcConstants.MESSAGE_HANDLER, new InboundHandler(connection));
-            pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<>(connection));
+      .group(eventLoopGroup) //
+      .channel(TransportCheck.getClientSocketChannel()) //
+      .option(ChannelOption.ALLOCATOR, alloc) //
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
+      .option(ChannelOption.SO_REUSEADDR, true)
+      .option(ChannelOption.SO_RCVBUF, 1 << 17) //
+      .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+      .option(ChannelOption.TCP_NODELAY, true)
+      .handler(new ChannelInitializer<SocketChannel>() {
+
+        @Override
+        protected void initChannel(SocketChannel ch) throws Exception {
+          // logger.debug("initializing client connection.");
+          connection = initRemoteConnection(ch);
+
+          ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+          final ChannelPipeline pipe = ch.pipeline();
+          // Make sure that the SSL handler is the first handler in the pipeline so everything is encrypted
+          if (isSslEnabled()) {
+            setupSSL(pipe, sslHandshakeListener);
+          }
+          if (idleStateHandler != null) {
+            pipe.addLast(RpcConstants.IDLE_STATE_HANDLER, idleStateHandler);
           }
-        }); //
+          pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator()));
+          pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("c-" + rpcConfig.getName()));
+          pipe.addLast(RpcConstants.PROTOCOL_ENCODER, new RpcEncoder("c-" + rpcConfig.getName()));
+          pipe.addLast(RpcConstants.HANDSHAKE_HANDLER, new ClientHandshakeHandler(connection));
+          pipe.addLast(RpcConstants.MESSAGE_HANDLER, new InboundHandler(connection));
+          pipe.addLast(RpcConstants.HEARTBEAT_HANDLER, heartBeatHandler);
+          pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<>(connection));
+        }
+      }); //
 
     // if(TransportCheck.SUPPORTS_EPOLL){
     // b.option(EpollChannelOption.SO_REUSEPORT, true); //
@@ -143,6 +147,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
 
   /**
    * Set's the state for authentication complete.
+   *
    * @param authComplete - state to set. True means authentication between client and server is completed, false
    *                     means authentication is in progress.
    */
@@ -160,38 +165,125 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
   }
 
   @Override
-  protected CC initRemoteConnection(SocketChannel channel){
-    local=channel.localAddress();
-    remote=channel.remoteAddress();
+  protected CC initRemoteConnection(SocketChannel channel) {
+    local = channel.localAddress();
+    remote = channel.remoteAddress();
     return null;
   }
 
-  private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
-
   /**
-   * Handler that watches for situations where we haven't read from the socket in a certain timeout. If we exceed this
-   * timeout, we send a PING message to the server to state that we are still alive.
+   * Handler watches for {@link IdleState#READER_IDLE IdleState.READER_IDLE} user event and sends message with
+   * {@link RpcMode#PING RpcMode.PING} to the server and waits for the {@link RpcMode#PONG RpcMode.PONG} answer.
+   * The handler watches for {@link RpcMode#PONG RpcMode.PONG} user event from
+   * {@link org.apache.drill.exec.rpc.RpcBus.InboundHandler} as a signal that the answer is received. If it is not received
+   * until answerWaitSec timeout, than the handler closes the connection.
    */
-  private class IdlePingHandler extends IdleStateHandler {
+  private class HeartBeatHandler extends ChannelInboundHandlerAdapter {
+    private final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
+    private final int answerWaitSec;
+    private final Queue<Pair<Promise<Boolean>, ScheduledFuture>> pongFutures = new LinkedList<>();
+    private ChannelHandlerContext ctx;
+
+    /**
+     * @param answerWaitSec timeout in seconds to wait an answer from the server
+     */
+    public HeartBeatHandler(int answerWaitSec) {
+      this.answerWaitSec = answerWaitSec;
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      this.ctx = ctx;
+      super.handlerAdded(ctx);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (evt instanceof IdleStateEvent) {
+        IdleStateEvent idleState = (IdleStateEvent) evt;
+        if (idleState.state() == IdleState.READER_IDLE) {
+          idleEvent();
+        }
+      } else if (evt instanceof RpcMode) {
+        RpcMode rpcMode = (RpcMode) evt;
+        if (rpcMode == RpcMode.PONG) {
+          pongReceived();
+        }
+      }
+      ctx.fireUserEventTriggered(evt);
+    }
+
+    private void idleEvent() {
+      EventExecutor executor = ctx.executor();
+
+      Promise<Boolean> pongReceived = executor.newPromise();
+      ScheduledFuture<?> pongTimeoutChecker = null;
+
+      if (answerWaitSec > 0) {
+        pongTimeoutChecker = executor.schedule(() -> {
+          if (!pongReceived.isSuccess()) {
+            logger.error("Unable to get an answer from the server. Timeout: {} seconds. Connection: {}.  " +
+              "Closing connection.", answerWaitSec, connection.getName());
+            connection.close();
+          }
+        }, answerWaitSec, TimeUnit.SECONDS);
+      }
+      pongFutures.add(Pair.of(pongReceived, pongTimeoutChecker));
+
+      sendPing();
+    }
 
-    private GenericFutureListener<Future<? super Void>> pingFailedHandler = new GenericFutureListener<Future<? super Void>>() {
-      public void operationComplete(Future<? super Void> future) throws Exception {
+    private void sendPing() {
+      ctx.channel().writeAndFlush(PING_MESSAGE).addListener(future -> {
         if (!future.isSuccess()) {
           logger.error("Unable to maintain connection {}.  Closing connection.", connection.getName());
-          connection.close();
+          close();
+        }
+      });
+    }
+
+    private void pongReceived() {
+      Pair<Promise<Boolean>, ScheduledFuture> pongFuture = pongFutures.poll();
+      if (pongFuture != null) {
+        Promise<Boolean> pongReceived = pongFuture.getLeft();
+        pongReceived.setSuccess(true);
+        ScheduledFuture pongTimeoutChecker = pongFuture.getRight();
+        if (pongTimeoutChecker != null) {
+          pongTimeoutChecker.cancel(false);
         }
       }
-    };
+    }
 
-    IdlePingHandler(long idleWaitInMillis) {
-      super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS);
+    public Promise<Boolean> demandHeartbeat() {
+      EventExecutor executor = this.ctx.executor();
+      Promise<Boolean> pongReceived = executor.newPromise();
+      pongFutures.add(Pair.of(pongReceived, null));
+      sendPing();
+      return pongReceived;
     }
+  }
 
-    @Override
-    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
-      if (evt.state() == IdleState.WRITER_IDLE) {
-        ctx.writeAndFlush(PING_MESSAGE).addListener(pingFailedHandler);
-      }
+  /**
+   * Sends request and waits for answer to verify connection.
+   *
+   * @param timeoutSec time in seconds to wait message receiving. If 0 then won't wait.
+   * @return true if answer received until timeout, false otherwise
+   */
+  public boolean hasPing(long timeoutSec) {
+    if (timeoutSec < 0) {
+      timeoutSec = 0;
+    }
+    try {
+      return heartBeatHandler.demandHeartbeat().await(timeoutSec, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      logger.warn("Heartbeat wait was interrupted.");
+
+      // Preserve evidence that the interruption occurred so that code higher up
+      // on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return false;
     }
   }
 
@@ -206,7 +298,8 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
   /**
    * Creates various instances needed to start the SASL handshake. This is called from
    * {@link BasicClient#validateHandshake(MessageLite)} if authentication is required from server side.
-   * @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
+   *
+   * @param connectionHandler    - Connection handler used by client's to know about success/failure conditions.
    * @param serverAuthMechanisms - List of auth mechanisms configured on server side
    */
   protected abstract void prepareSaslHandshake(final RpcConnectionHandler<CC> connectionHandler,
@@ -217,11 +310,12 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
    * after regular RPC handshake that authentication is required by server side. Once authentication is completed
    * then only the underlying channel is made available to clients to send other RPC messages. Success and failure
    * events are notified to the connection handler on which client waits.
+   *
    * @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
-   * @param saslProperties - SASL related properties needed to create SASL client.
-   * @param ugi - UserGroupInformation with logged in client side user
-   * @param authFactory - Authentication factory to use for this SASL handshake.
-   * @param rpcType - SASL_MESSAGE rpc type.
+   * @param saslProperties    - SASL related properties needed to create SASL client.
+   * @param ugi               - UserGroupInformation with logged in client side user
+   * @param authFactory       - Authentication factory to use for this SASL handshake.
+   * @param rpcType           - SASL_MESSAGE rpc type.
    */
   protected void startSaslHandshake(final RpcConnectionHandler<CC> connectionHandler,
                                     Map<String, ?> saslProperties, UserGroupInformation ugi,
@@ -245,22 +339,22 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
     logger.debug("Initiating SASL exchange.");
     new AuthenticationOutcomeListener<>(this, connection, rpcType, ugi,
       new RpcOutcomeListener<Void>() {
-      @Override
-      public void failed(RpcException ex) {
-        connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
-      }
+        @Override
+        public void failed(RpcException ex) {
+          connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+        }
 
-      @Override
-      public void success(Void value, ByteBuf buffer) {
-        authComplete = true;
-        connectionHandler.connectionSucceeded(connection);
-      }
+        @Override
+        public void success(Void value, ByteBuf buffer) {
+          authComplete = true;
+          connectionHandler.connectionSucceeded(connection);
+        }
 
-      @Override
-      public void interrupted(InterruptedException ex) {
-        connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
-      }
-    }).initiate(mechanismName);
+        @Override
+        public void interrupted(InterruptedException ex) {
+          connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+        }
+      }).initiate(mechanismName);
   }
 
   protected void finalizeConnection(HR handshake, CC connection) {
@@ -280,16 +374,16 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
 
   public <SEND extends MessageLite, RECEIVE extends MessageLite>
   void send(RpcOutcomeListener<RECEIVE> listener, SEND protobufBody, boolean allowInEventLoop,
-      ByteBuf... dataBodies) {
+            ByteBuf... dataBodies) {
     super.send(listener, connection, handshakeType, protobufBody, (Class<RECEIVE>) responseClass,
-        allowInEventLoop, dataBodies);
+      allowInEventLoop, dataBodies);
   }
 
   protected void connectAsClient(RpcConnectionHandler<CC> connectionListener, HS handshakeValue,
                                  String host, int port) {
     ConnectionMultiListener<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> cml;
-    ConnectionMultiListener.Builder<T, CC, HS, HR, BasicClient<T, CC, HS, HR> > builder =
-        ConnectionMultiListener.newBuilder(connectionListener, handshakeValue, this);
+    ConnectionMultiListener.Builder<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> builder =
+      ConnectionMultiListener.newBuilder(connectionListener, handshakeValue, this);
     if (isSslEnabled()) {
       cml = builder.enableSSL().build();
       sslHandshakeListener = new ConnectionMultiListener.SSLHandshakeListener();
@@ -314,7 +408,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
     protected final void consumeHandshake(ChannelHandlerContext ctx, HR msg) throws Exception {
       // remove the handshake information from the queue so it doesn't sit there forever.
       final RpcOutcome<HR> response =
-          connection.getAndRemoveRpcOutcome(handshakeType.getNumber(), coordinationId, responseClass);
+        connection.getAndRemoveRpcOutcome(handshakeType.getNumber(), coordinationId, responseClass);
       response.set(msg, null);
     }
 
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 944e854..c296f39 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.rpc;
 
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.channel.Channel;
@@ -26,6 +30,11 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
 import java.io.Closeable;
 import java.net.SocketAddress;
@@ -34,17 +43,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
 /**
  * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a
  * system.
@@ -252,7 +250,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
     @Override
     protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output)
-        throws Exception {
+            throws Exception {
       if (!ctx.channel().isOpen()) {
         return;
       }
@@ -318,7 +316,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
           break;
 
         case PONG:
-          // noop.
+          ctx.fireUserEventTriggered(RpcMode.PONG);
           break;
 
         default:
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
index 455ada8..4d9db54 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
@@ -34,6 +34,7 @@ public class RpcConstants {
   public static final String MESSAGE_HANDLER = "message-handler";
   public static final String EXCEPTION_HANDLER = "exception-handler";
   public static final String IDLE_STATE_HANDLER = "idle-state-handler";
+  public static final String HEARTBEAT_HANDLER = "heartbeat-handler";
   public static final String SASL_DECRYPTION_HANDLER = "sasl-decryption-handler";
   public static final String SASL_ENCRYPTION_HANDLER = "sasl-encryption-handler";
   public static final String LENGTH_DECODER_HANDLER = "length-decoder";