You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:53 UTC

[07/17] drill git commit: DRILL-3081: Populate connection name as late as possible so RPC error messages are reported correctly.

DRILL-3081: Populate connection name as late as possible so RPC error messages are reported correctly.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4b0b3a67
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4b0b3a67
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4b0b3a67

Branch: refs/heads/master
Commit: 4b0b3a67ab5e2db2baf34250bdedb174fce648ad
Parents: f0b3671
Author: Parth Chandra <pa...@apache.org>
Authored: Thu May 14 12:27:40 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/rpc/BasicClient.java  | 15 +++++++--
 .../exec/rpc/BasicClientWithConnection.java     |  1 +
 .../org/apache/drill/exec/rpc/BasicServer.java  | 16 +++++-----
 .../apache/drill/exec/rpc/RemoteConnection.java |  8 +++--
 .../java/org/apache/drill/exec/rpc/RpcBus.java  | 32 +++++++++++++++-----
 .../drill/exec/rpc/RpcExceptionHandler.java     | 13 ++++----
 .../drill/exec/rpc/control/ControlClient.java   |  3 +-
 .../drill/exec/rpc/control/ControlServer.java   |  1 +
 .../apache/drill/exec/rpc/data/DataClient.java  |  1 +
 .../apache/drill/exec/rpc/data/DataServer.java  |  1 +
 .../apache/drill/exec/rpc/user/UserServer.java  |  1 +
 11 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index a33b370..cf09be3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -33,6 +33,7 @@ import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import java.net.SocketAddress;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -46,7 +47,7 @@ import com.google.protobuf.Parser;
 
 public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
     extends RpcBus<T, R> {
-  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
   // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
   // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
@@ -101,7 +102,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
             }
 
             pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+            pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
           }
         }); //
 
@@ -110,6 +111,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     // }
   }
 
+  public R initRemoteConnection(SocketChannel channel){
+    local=channel.localAddress();
+    remote=channel.remoteAddress();
+    return null;
+  };
+
   private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
 
   /**
@@ -200,12 +207,14 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
         // So there is no point propagating the interruption as failure immediately.
         long remainingWaitTimeMills = 120000;
         long startTime = System.currentTimeMillis();
-
         // logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
         while(true) {
           try {
             future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
             if (future.isSuccess()) {
+              SocketAddress remote = future.channel().remoteAddress();
+              SocketAddress local = future.channel().localAddress();
+              setAddresses(remote, local);
               // send a handshake on the current thread. This is the only time we will send from within the event thread.
               // We can do this because the connection will not be backed up.
               send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index ab54fa1..c194b5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -51,6 +51,7 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
 
   @Override
   public ServerConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     return new ServerConnection(connectionName, channel, alloc);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 6a7bc65..5c04264 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -85,11 +85,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 
             if (rpcMapping.hasTimeout()) {
               pipe.addLast(TIMEOUT_HANDLER,
-                  new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout()));
+                  new LogggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
             }
 
             pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+            pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
 
             connect = true;
 //            logger.debug("Server connection initialization completed.");
@@ -101,19 +101,19 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 //     }
   }
 
-  private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+  private class LogggingReadTimeoutHandler<C extends RemoteConnection> extends ReadTimeoutHandler {
 
-    private final String name;
+    private final C connection;
     private final int timeoutSeconds;
-    public LogggingReadTimeoutHandler(String name, int timeoutSeconds) {
+    public LogggingReadTimeoutHandler(C connection, int timeoutSeconds) {
       super(timeoutSeconds);
-      this.name = name;
+      this.connection = connection;
       this.timeoutSeconds = timeoutSeconds;
     }
 
     @Override
     protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
-      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", name,
+      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", connection.getName(),
           timeoutSeconds);
       super.readTimedOut(ctx);
     }
@@ -178,6 +178,8 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 
   @Override
   public C initRemoteConnection(SocketChannel channel) {
+    local = channel.localAddress();
+    remote = channel.remoteAddress();
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 199569c..30abcc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -33,7 +33,8 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
   private final Channel channel;
   private final WriteManager writeManager;
-  private final String name;
+  private String name;
+  private final String clientName;
 
   public boolean inEventLoop(){
     return channel.eventLoop().inEventLoop();
@@ -42,7 +43,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   public RemoteConnection(SocketChannel channel, String name) {
     super();
     this.channel = channel;
-    this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name);
+    this.clientName = name;
     this.writeManager = new WriteManager();
     channel.pipeline().addLast(new BackPressureHandler());
     channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@@ -57,6 +58,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   }
 
   public String getName() {
+    if(name == null){
+      name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
+    }
     return name;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 1a23724..812b2fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -29,6 +29,7 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -67,10 +68,19 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   protected final RpcConfig rpcConfig;
 
+  protected volatile SocketAddress local;
+  protected volatile SocketAddress remote;
+
+
   public RpcBus(RpcConfig rpcConfig) {
     this.rpcConfig = rpcConfig;
   }
 
+  protected void setAddresses(SocketAddress remote, SocketAddress local){
+    this.remote = remote;
+    this.local = local;
+  }
+
   <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
       SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
     DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
@@ -133,21 +143,27 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
 
-    final InetSocketAddress local;
-    final InetSocketAddress remote;
     final C clientConnection;
+    private final Channel channel;
 
-    public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) {
-      this.local = local;
-      this.remote = remote;
+    public ChannelClosedHandler(C clientConnection, Channel channel) {
+      this.channel = channel;
       this.clientConnection = clientConnection;
     }
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-      String msg = String.format("Channel closed %s <--> %s.", local, remote);
+      String msg;
+      if(local!=null) {
+        msg = String.format("Channel closed %s <--> %s.", local, remote);
+      }else{
+        msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress());
+      }
+
       if (RpcBus.this.isClient()) {
-        logger.info(String.format(msg));
+        if(local != null) {
+          logger.info(String.format(msg));
+        }
       } else {
         queue.channelClosed(new ChannelClosedException(msg));
       }
@@ -158,7 +174,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
   }
 
   protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
-    return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress());
+    return new ChannelClosedHandler(clientConnection, channel);
   }
 
   private class ResponseSenderImpl implements ResponseSender {

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index c12ff7b..46b7702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -19,23 +19,24 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
+import org.eclipse.jetty.io.Connection;
 
-public class RpcExceptionHandler implements ChannelHandler{
+public class RpcExceptionHandler<C extends RemoteConnection> implements ChannelHandler{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
 
-  private final String name;
+  private final C connection;
 
-  public RpcExceptionHandler(String name) {
-    this.name = name;
+  public RpcExceptionHandler(C connection){
+    this.connection = connection;
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
     if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
-      logger.warn("Exception occurred with closed channel.  Connection: {}", name, cause);
+      logger.warn("Exception occurred with closed channel.  Connection: {}", connection.getName(), cause);
       return;
     }else{
-      logger.error("Exception in RPC communication.  Connection: {}.  Closing connection.", name, cause);
+      logger.error("Exception in RPC communication.  Connection: {}.  Closing connection.", connection.getName(), cause);
       ctx.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index f191271..159f1df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -40,7 +40,7 @@ import com.google.protobuf.MessageLite;
 
 public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
 
   private final ControlMessageHandler handler;
   private final DrillbitEndpoint remoteEndpoint;
@@ -66,6 +66,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
   @SuppressWarnings("unchecked")
   @Override
   public ControlConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     this.connection = new ControlConnection("control client", channel,
         (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
     return connection;

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 5e405ab..98ce9e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -69,6 +69,7 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
 
   @Override
   public ControlConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     return new ControlConnection("control server", channel, this, allocator);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index b8a07c7..544bab9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -56,6 +56,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
 
   @Override
   public DataClientConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     this.connection = new DataClientConnection(channel, this);
     return connection;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 061ddcb..80d2d6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -77,6 +77,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
 
   @Override
   public BitServerConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     return new BitServerConnection(channel, context.getAllocator());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 72b07ba..a197356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -175,6 +175,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
   @Override
   public UserClientConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     return new UserClientConnection(channel);
   }