You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:53 UTC
[07/17] drill git commit: DRILL-3081: Populate connection name as
late as possible so RPC error messages are reported correctly.
DRILL-3081: Populate connection name as late as possible so RPC error messages are reported correctly.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4b0b3a67
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4b0b3a67
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4b0b3a67
Branch: refs/heads/master
Commit: 4b0b3a67ab5e2db2baf34250bdedb174fce648ad
Parents: f0b3671
Author: Parth Chandra <pa...@apache.org>
Authored: Thu May 14 12:27:40 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/rpc/BasicClient.java | 15 +++++++--
.../exec/rpc/BasicClientWithConnection.java | 1 +
.../org/apache/drill/exec/rpc/BasicServer.java | 16 +++++-----
.../apache/drill/exec/rpc/RemoteConnection.java | 8 +++--
.../java/org/apache/drill/exec/rpc/RpcBus.java | 32 +++++++++++++++-----
.../drill/exec/rpc/RpcExceptionHandler.java | 13 ++++----
.../drill/exec/rpc/control/ControlClient.java | 3 +-
.../drill/exec/rpc/control/ControlServer.java | 1 +
.../apache/drill/exec/rpc/data/DataClient.java | 1 +
.../apache/drill/exec/rpc/data/DataServer.java | 1 +
.../apache/drill/exec/rpc/user/UserServer.java | 1 +
11 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index a33b370..cf09be3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -33,6 +33,7 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
+import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -46,7 +47,7 @@ import com.google.protobuf.Parser;
public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
extends RpcBus<T, R> {
- final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
// The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
// example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
@@ -101,7 +102,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
pipe.addLast("message-handler", new InboundHandler(connection));
- pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+ pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
}
}); //
@@ -110,6 +111,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
// }
}
+ public R initRemoteConnection(SocketChannel channel){
+ local=channel.localAddress();
+ remote=channel.remoteAddress();
+ return null;
+ };
+
private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
/**
@@ -200,12 +207,14 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
// So there is no point propagating the interruption as failure immediately.
long remainingWaitTimeMills = 120000;
long startTime = System.currentTimeMillis();
-
// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
while(true) {
try {
future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
if (future.isSuccess()) {
+ SocketAddress remote = future.channel().remoteAddress();
+ SocketAddress local = future.channel().localAddress();
+ setAddresses(remote, local);
// send a handshake on the current thread. This is the only time we will send from within the event thread.
// We can do this because the connection will not be backed up.
send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index ab54fa1..c194b5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -51,6 +51,7 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
@Override
public ServerConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
return new ServerConnection(connectionName, channel, alloc);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 6a7bc65..5c04264 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -85,11 +85,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
if (rpcMapping.hasTimeout()) {
pipe.addLast(TIMEOUT_HANDLER,
- new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout()));
+ new LogggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
}
pipe.addLast("message-handler", new InboundHandler(connection));
- pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+ pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
connect = true;
// logger.debug("Server connection initialization completed.");
@@ -101,19 +101,19 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
// }
}
- private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+ private class LogggingReadTimeoutHandler<C extends RemoteConnection> extends ReadTimeoutHandler {
- private final String name;
+ private final C connection;
private final int timeoutSeconds;
- public LogggingReadTimeoutHandler(String name, int timeoutSeconds) {
+ public LogggingReadTimeoutHandler(C connection, int timeoutSeconds) {
super(timeoutSeconds);
- this.name = name;
+ this.connection = connection;
this.timeoutSeconds = timeoutSeconds;
}
@Override
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
- logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", name,
+ logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", connection.getName(),
timeoutSeconds);
super.readTimedOut(ctx);
}
@@ -178,6 +178,8 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
@Override
public C initRemoteConnection(SocketChannel channel) {
+ local = channel.localAddress();
+ remote = channel.remoteAddress();
return null;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 199569c..30abcc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -33,7 +33,8 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
private final Channel channel;
private final WriteManager writeManager;
- private final String name;
+ private String name;
+ private final String clientName;
public boolean inEventLoop(){
return channel.eventLoop().inEventLoop();
@@ -42,7 +43,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
public RemoteConnection(SocketChannel channel, String name) {
super();
this.channel = channel;
- this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name);
+ this.clientName = name;
this.writeManager = new WriteManager();
channel.pipeline().addLast(new BackPressureHandler());
channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@@ -57,6 +58,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
}
public String getName() {
+ if(name == null){
+ name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
+ }
return name;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 1a23724..812b2fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -29,6 +29,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -67,10 +68,19 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
protected final RpcConfig rpcConfig;
+ protected volatile SocketAddress local;
+ protected volatile SocketAddress remote;
+
+
public RpcBus(RpcConfig rpcConfig) {
this.rpcConfig = rpcConfig;
}
+ protected void setAddresses(SocketAddress remote, SocketAddress local){
+ this.remote = remote;
+ this.local = local;
+ }
+
<SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
@@ -133,21 +143,27 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
- final InetSocketAddress local;
- final InetSocketAddress remote;
final C clientConnection;
+ private final Channel channel;
- public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) {
- this.local = local;
- this.remote = remote;
+ public ChannelClosedHandler(C clientConnection, Channel channel) {
+ this.channel = channel;
this.clientConnection = clientConnection;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- String msg = String.format("Channel closed %s <--> %s.", local, remote);
+ String msg;
+ if(local!=null) {
+ msg = String.format("Channel closed %s <--> %s.", local, remote);
+ }else{
+ msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress());
+ }
+
if (RpcBus.this.isClient()) {
- logger.info(String.format(msg));
+ if(local != null) {
+ logger.info(String.format(msg));
+ }
} else {
queue.channelClosed(new ChannelClosedException(msg));
}
@@ -158,7 +174,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
- return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress());
+ return new ChannelClosedHandler(clientConnection, channel);
}
private class ResponseSenderImpl implements ResponseSender {
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index c12ff7b..46b7702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -19,23 +19,24 @@ package org.apache.drill.exec.rpc;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import org.eclipse.jetty.io.Connection;
-public class RpcExceptionHandler implements ChannelHandler{
+public class RpcExceptionHandler<C extends RemoteConnection> implements ChannelHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
- private final String name;
+ private final C connection;
- public RpcExceptionHandler(String name) {
- this.name = name;
+ public RpcExceptionHandler(C connection){
+ this.connection = connection;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
- logger.warn("Exception occurred with closed channel. Connection: {}", name, cause);
+ logger.warn("Exception occurred with closed channel. Connection: {}", connection.getName(), cause);
return;
}else{
- logger.error("Exception in RPC communication. Connection: {}. Closing connection.", name, cause);
+ logger.error("Exception in RPC communication. Connection: {}. Closing connection.", connection.getName(), cause);
ctx.close();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index f191271..159f1df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -40,7 +40,7 @@ import com.google.protobuf.MessageLite;
public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
private final ControlMessageHandler handler;
private final DrillbitEndpoint remoteEndpoint;
@@ -66,6 +66,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
@SuppressWarnings("unchecked")
@Override
public ControlConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
this.connection = new ControlConnection("control client", channel,
(RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
return connection;
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 5e405ab..98ce9e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -69,6 +69,7 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
@Override
public ControlConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
return new ControlConnection("control server", channel, this, allocator);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index b8a07c7..544bab9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -56,6 +56,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
@Override
public DataClientConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
this.connection = new DataClientConnection(channel, this);
return connection;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 061ddcb..80d2d6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -77,6 +77,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
@Override
public BitServerConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
return new BitServerConnection(channel, context.getAllocator());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 72b07ba..a197356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -175,6 +175,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
@Override
public UserClientConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
return new UserClientConnection(channel);
}