You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/09 17:51:47 UTC
[1/3] drill git commit: DRILL-2971, DRILL-2886, DRILL-2778,
DRILL-2545: Improve RPC connection detection failure. Add RPC timeout.
Repository: drill
Updated Branches:
refs/heads/master d4f9bf2e9 -> 9d2096229
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
index c072a47..c3ff58b 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
@@ -1836,6 +1836,16 @@ public final class UserProtos {
* <code>optional bool support_complex_types = 6 [default = false];</code>
*/
boolean getSupportComplexTypes();
+
+ // optional bool support_timeout = 7 [default = false];
+ /**
+ * <code>optional bool support_timeout = 7 [default = false];</code>
+ */
+ boolean hasSupportTimeout();
+ /**
+ * <code>optional bool support_timeout = 7 [default = false];</code>
+ */
+ boolean getSupportTimeout();
}
/**
* Protobuf type {@code exec.user.UserToBitHandshake}
@@ -1940,6 +1950,11 @@ public final class UserProtos {
supportComplexTypes_ = input.readBool();
break;
}
+ case 56: {
+ bitField0_ |= 0x00000040;
+ supportTimeout_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2088,6 +2103,22 @@ public final class UserProtos {
return supportComplexTypes_;
}
+ // optional bool support_timeout = 7 [default = false];
+ public static final int SUPPORT_TIMEOUT_FIELD_NUMBER = 7;
+ private boolean supportTimeout_;
+ /**
+ * <code>optional bool support_timeout = 7 [default = false];</code>
+ */
+ public boolean hasSupportTimeout() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional bool support_timeout = 7 [default = false];</code>
+ */
+ public boolean getSupportTimeout() {
+ return supportTimeout_;
+ }
+
private void initFields() {
channel_ = org.apache.drill.exec.proto.UserBitShared.RpcChannel.USER;
supportListening_ = false;
@@ -2095,6 +2126,7 @@ public final class UserProtos {
credentials_ = org.apache.drill.exec.proto.UserBitShared.UserCredentials.getDefaultInstance();
properties_ = org.apache.drill.exec.proto.UserProtos.UserProperties.getDefaultInstance();
supportComplexTypes_ = false;
+ supportTimeout_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -2132,6 +2164,9 @@ public final class UserProtos {
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBool(6, supportComplexTypes_);
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeBool(7, supportTimeout_);
+ }
getUnknownFields().writeTo(output);
}
@@ -2165,6 +2200,10 @@ public final class UserProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(6, supportComplexTypes_);
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(7, supportTimeout_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -2303,6 +2342,8 @@ public final class UserProtos {
bitField0_ = (bitField0_ & ~0x00000010);
supportComplexTypes_ = false;
bitField0_ = (bitField0_ & ~0x00000020);
+ supportTimeout_ = false;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -2363,6 +2404,10 @@ public final class UserProtos {
to_bitField0_ |= 0x00000020;
}
result.supportComplexTypes_ = supportComplexTypes_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.supportTimeout_ = supportTimeout_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -2397,6 +2442,9 @@ public final class UserProtos {
if (other.hasSupportComplexTypes()) {
setSupportComplexTypes(other.getSupportComplexTypes());
}
+ if (other.hasSupportTimeout()) {
+ setSupportTimeout(other.getSupportTimeout());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -2799,6 +2847,39 @@ public final class UserProtos {
return this;
}
+ // optional bool support_timeout = 7 [default = false];
+ private boolean supportTimeout_ ;
+ /**
+ * <code>optional bool support_timeout = 7 [default = false];</code>
+ */
+ public boolean hasSupportTimeout() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional bool support_timeout = 7 [default = false];</code>
+ */
+ public boolean getSupportTimeout() {
+ return supportTimeout_;
+ }
+ /**
+ * <code>optional bool support_timeout = 7 [default = false];</code>
+ */
+ public Builder setSupportTimeout(boolean value) {
+ bitField0_ |= 0x00000040;
+ supportTimeout_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool support_timeout = 7 [default = false];</code>
+ */
+ public Builder clearSupportTimeout() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ supportTimeout_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:exec.user.UserToBitHandshake)
}
@@ -4890,31 +4971,31 @@ public final class UserProtos {
"\032\023UserBitShared.proto\"&\n\010Property\022\013\n\003key" +
"\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\"9\n\016UserProperties\022" +
"\'\n\nproperties\030\001 \003(\0132\023.exec.user.Property" +
- "\"\374\001\n\022UserToBitHandshake\022.\n\007channel\030\001 \001(\016" +
+ "\"\234\002\n\022UserToBitHandshake\022.\n\007channel\030\001 \001(\016" +
"2\027.exec.shared.RpcChannel:\004USER\022\031\n\021suppo" +
"rt_listening\030\002 \001(\010\022\023\n\013rpc_version\030\003 \001(\005\022" +
"1\n\013credentials\030\004 \001(\0132\034.exec.shared.UserC" +
"redentials\022-\n\nproperties\030\005 \001(\0132\031.exec.us" +
"er.UserProperties\022$\n\025support_complex_typ",
- "es\030\006 \001(\010:\005false\"S\n\016RequestResults\022&\n\010que" +
- "ry_id\030\001 \001(\0132\024.exec.shared.QueryId\022\031\n\021max" +
- "imum_responses\030\002 \001(\005\"q\n\010RunQuery\0221\n\014resu" +
- "lts_mode\030\001 \001(\0162\033.exec.user.QueryResultsM" +
- "ode\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryTyp" +
- "e\022\014\n\004plan\030\003 \001(\t\"|\n\022BitToUserHandshake\022\023\n" +
- "\013rpc_version\030\002 \001(\005\022*\n\006status\030\003 \001(\0162\032.exe" +
- "c.user.HandshakeStatus\022\017\n\007errorId\030\004 \001(\t\022" +
- "\024\n\014errorMessage\030\005 \001(\t*\310\001\n\007RpcType\022\r\n\tHAN" +
- "DSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QU",
- "ERY\020\003\022\020\n\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESUL" +
- "TS\020\005\022\016\n\nQUERY_DATA\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026" +
- "\n\022REQ_META_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_" +
- "LIST\020\t\022\020\n\014QUERY_RESULT\020\n*#\n\020QueryResults" +
- "Mode\022\017\n\013STREAM_FULL\020\001*^\n\017HandshakeStatus" +
- "\022\013\n\007SUCCESS\020\001\022\030\n\024RPC_VERSION_MISMATCH\020\002\022" +
- "\017\n\013AUTH_FAILED\020\003\022\023\n\017UNKNOWN_FAILURE\020\004B+\n" +
- "\033org.apache.drill.exec.protoB\nUserProtos" +
- "H\001"
+ "es\030\006 \001(\010:\005false\022\036\n\017support_timeout\030\007 \001(\010" +
+ ":\005false\"S\n\016RequestResults\022&\n\010query_id\030\001 " +
+ "\001(\0132\024.exec.shared.QueryId\022\031\n\021maximum_res" +
+ "ponses\030\002 \001(\005\"q\n\010RunQuery\0221\n\014results_mode" +
+ "\030\001 \001(\0162\033.exec.user.QueryResultsMode\022$\n\004t" +
+ "ype\030\002 \001(\0162\026.exec.shared.QueryType\022\014\n\004pla" +
+ "n\030\003 \001(\t\"|\n\022BitToUserHandshake\022\023\n\013rpc_ver" +
+ "sion\030\002 \001(\005\022*\n\006status\030\003 \001(\0162\032.exec.user.H" +
+ "andshakeStatus\022\017\n\007errorId\030\004 \001(\t\022\024\n\014error" +
+ "Message\030\005 \001(\t*\310\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000",
+ "\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUERY\020\003\022\020\n" +
+ "\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020\005\022\016\n\n" +
+ "QUERY_DATA\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n\022REQ_ME" +
+ "TA_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_LIST\020\t\022\020" +
+ "\n\014QUERY_RESULT\020\n*#\n\020QueryResultsMode\022\017\n\013" +
+ "STREAM_FULL\020\001*^\n\017HandshakeStatus\022\013\n\007SUCC" +
+ "ESS\020\001\022\030\n\024RPC_VERSION_MISMATCH\020\002\022\017\n\013AUTH_" +
+ "FAILED\020\003\022\023\n\017UNKNOWN_FAILURE\020\004B+\n\033org.apa" +
+ "che.drill.exec.protoB\nUserProtosH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4938,7 +5019,7 @@ public final class UserProtos {
internal_static_exec_user_UserToBitHandshake_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_user_UserToBitHandshake_descriptor,
- new java.lang.String[] { "Channel", "SupportListening", "RpcVersion", "Credentials", "Properties", "SupportComplexTypes", });
+ new java.lang.String[] { "Channel", "SupportListening", "RpcVersion", "Credentials", "Properties", "SupportComplexTypes", "SupportTimeout", });
internal_static_exec_user_RequestResults_descriptor =
getDescriptor().getMessageTypes().get(3);
internal_static_exec_user_RequestResults_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java
index df71855..e34b52d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java
@@ -24,7 +24,9 @@ public enum RpcMode implements com.dyuproject.protostuff.EnumLite<RpcMode>
{
REQUEST(0),
RESPONSE(1),
- RESPONSE_FAILURE(2);
+ RESPONSE_FAILURE(2),
+ PING(3),
+ PONG(4);
public final int number;
@@ -45,6 +47,8 @@ public enum RpcMode implements com.dyuproject.protostuff.EnumLite<RpcMode>
case 0: return REQUEST;
case 1: return RESPONSE;
case 2: return RESPONSE_FAILURE;
+ case 3: return PING;
+ case 4: return PONG;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
index 67ac4e5..efd8e58 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
@@ -47,6 +47,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
static final UserToBitHandshake DEFAULT_INSTANCE = new UserToBitHandshake();
static final Boolean DEFAULT_SUPPORT_COMPLEX_TYPES = new Boolean(false);
+ static final Boolean DEFAULT_SUPPORT_TIMEOUT = new Boolean(false);
private RpcChannel channel;
private Boolean supportListening;
@@ -54,6 +55,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
private UserCredentials credentials;
private UserProperties properties;
private Boolean supportComplexTypes = DEFAULT_SUPPORT_COMPLEX_TYPES;
+ private Boolean supportTimeout = DEFAULT_SUPPORT_TIMEOUT;
public UserToBitHandshake()
{
@@ -140,6 +142,19 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
return this;
}
+ // supportTimeout
+
+ public Boolean getSupportTimeout()
+ {
+ return supportTimeout;
+ }
+
+ public UserToBitHandshake setSupportTimeout(Boolean supportTimeout)
+ {
+ this.supportTimeout = supportTimeout;
+ return this;
+ }
+
// java serialization
public void readExternal(ObjectInput in) throws IOException
@@ -214,6 +229,9 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
case 6:
message.supportComplexTypes = input.readBool();
break;
+ case 7:
+ message.supportTimeout = input.readBool();
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -242,6 +260,9 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
if(message.supportComplexTypes != null && message.supportComplexTypes != DEFAULT_SUPPORT_COMPLEX_TYPES)
output.writeBool(6, message.supportComplexTypes, false);
+
+ if(message.supportTimeout != null && message.supportTimeout != DEFAULT_SUPPORT_TIMEOUT)
+ output.writeBool(7, message.supportTimeout, false);
}
public String getFieldName(int number)
@@ -254,6 +275,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
case 4: return "credentials";
case 5: return "properties";
case 6: return "supportComplexTypes";
+ case 7: return "supportTimeout";
default: return null;
}
}
@@ -273,6 +295,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
__fieldMap.put("credentials", 4);
__fieldMap.put("properties", 5);
__fieldMap.put("supportComplexTypes", 6);
+ __fieldMap.put("supportTimeout", 7);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/protobuf/GeneralRPC.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/GeneralRPC.proto b/protocol/src/main/protobuf/GeneralRPC.proto
index 5538abe..26ab821 100644
--- a/protocol/src/main/protobuf/GeneralRPC.proto
+++ b/protocol/src/main/protobuf/GeneralRPC.proto
@@ -14,6 +14,8 @@ enum RpcMode {
REQUEST = 0;
RESPONSE = 1;
RESPONSE_FAILURE = 2;
+ PING = 3;
+ PONG = 4;
}
message RpcHeader{
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/User.proto b/protocol/src/main/protobuf/User.proto
index 59e22ae..185a646 100644
--- a/protocol/src/main/protobuf/User.proto
+++ b/protocol/src/main/protobuf/User.proto
@@ -44,6 +44,7 @@ message UserToBitHandshake {
optional exec.shared.UserCredentials credentials = 4;
optional UserProperties properties = 5;
optional bool support_complex_types = 6 [default = false];
+ optional bool support_timeout = 7 [default = false];
}
message RequestResults {
[2/3] drill git commit: DRILL-2971, DRILL-2886, DRILL-2778,
DRILL-2545: Improve RPC connection detection failure. Add RPC timeout.
Posted by ja...@apache.org.
DRILL-2971, DRILL-2886, DRILL-2778, DRILL-2545: Improve RPC connection detection failure. Add RPC timeout.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/960f876a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/960f876a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/960f876a
Branch: refs/heads/master
Commit: 960f876a1945eee4eeb0d6a98c5c58dfe2eea1a9
Parents: d4f9bf2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Fri May 8 08:15:36 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 9 07:12:23 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 2 +
.../apache/drill/exec/client/DrillClient.java | 8 +-
.../org/apache/drill/exec/rpc/BasicClient.java | 110 ++++++++++----
.../exec/rpc/BasicClientWithConnection.java | 21 +--
.../org/apache/drill/exec/rpc/BasicServer.java | 78 +++++++---
.../drill/exec/rpc/CoordinationQueue.java | 2 +-
.../apache/drill/exec/rpc/RemoteConnection.java | 31 +++-
.../java/org/apache/drill/exec/rpc/RpcBus.java | 151 +++++++++++--------
.../org/apache/drill/exec/rpc/RpcConfig.java | 35 ++++-
.../drill/exec/rpc/RpcExceptionHandler.java | 12 +-
.../drill/exec/rpc/control/ControlClient.java | 14 +-
.../exec/rpc/control/ControlConnection.java | 7 +-
.../exec/rpc/control/ControlRpcConfig.java | 25 +--
.../drill/exec/rpc/control/ControlServer.java | 13 +-
.../exec/rpc/data/BitServerConnection.java | 7 +-
.../apache/drill/exec/rpc/data/DataClient.java | 11 +-
.../exec/rpc/data/DataClientConnection.java | 6 +-
.../drill/exec/rpc/data/DataRpcConfig.java | 15 +-
.../apache/drill/exec/rpc/data/DataServer.java | 11 +-
.../drill/exec/rpc/user/QueryResultHandler.java | 36 ++++-
.../apache/drill/exec/rpc/user/UserClient.java | 19 ++-
.../drill/exec/rpc/user/UserRpcConfig.java | 22 ++-
.../apache/drill/exec/rpc/user/UserServer.java | 23 ++-
.../apache/drill/exec/work/foreman/Foreman.java | 19 +++
.../src/main/resources/drill-module.conf | 2 +
.../drill/exec/proto/GeneralRPCProtos.java | 26 +++-
.../drill/exec/proto/SchemaUserProtos.java | 7 +
.../org/apache/drill/exec/proto/UserProtos.java | 123 ++++++++++++---
.../apache/drill/exec/proto/beans/RpcMode.java | 6 +-
.../exec/proto/beans/UserToBitHandshake.java | 23 +++
protocol/src/main/protobuf/GeneralRPC.proto | 2 +
protocol/src/main/protobuf/User.proto | 1 +
32 files changed, 626 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be8c7a0..97d5770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -42,7 +42,9 @@ public interface ExecConstants {
public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.exec.storage.packages";
public static final String SERVICE_NAME = "drill.exec.cluster-id";
public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port";
+ public static final String BIT_RPC_TIMEOUT = "drill.exec.rpc.bit.timeout";
public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.server.port";
+ public static final String USER_RPC_TIMEOUT = "drill.exec.rpc.user.timeout";
public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context";
public static final String FUNCTION_PACKAGES = "drill.exec.functions";
public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip";
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 9924704..136d8c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -20,20 +20,17 @@ package org.apache.drill.exec.client;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
-
-import com.google.common.base.Strings;
import io.netty.buffer.DrillBuf;
+import io.netty.channel.EventLoopGroup;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
-import io.netty.channel.EventLoopGroup;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
@@ -63,6 +60,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractCheckedFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -208,7 +206,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-");
- client = new UserClient(supportComplexTypes, allocator, eventLoopGroup);
+ client = new UserClient(config, supportComplexTypes, allocator, eventLoopGroup);
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
connect(endpoint);
connected = true;
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 72ae130..1661f81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -20,18 +20,24 @@ package org.apache.drill.exec.rpc;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
import com.google.protobuf.Internal.EnumLite;
@@ -40,7 +46,12 @@ import com.google.protobuf.Parser;
public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
extends RpcBus<T, R> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
+ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
+
+ // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
+ // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
+ // seconds, the idle state handler will send a ping message.
+ private static final double PERCENT_TIMEOUT_BEFORE_SENDING_PING = 0.5;
private final Bootstrap b;
protected R connection;
@@ -48,18 +59,23 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
private final Class<HANDSHAKE_RESPONSE> responseClass;
private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
+ private final IdlePingHandler pingHandler;
+
public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
super(rpcMapping);
this.responseClass = responseClass;
this.handshakeType = handshakeType;
this.handshakeParser = handshakeParser;
+ final long timeoutInMillis = rpcMapping.hasTimeout() ? (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING)
+ : -1;
+ this.pingHandler = rpcMapping.hasTimeout() ? new IdlePingHandler(timeoutInMillis) : null;
b = new Bootstrap() //
.group(eventLoopGroup) //
.channel(TransportCheck.getClientSocketChannel()) //
.option(ChannelOption.ALLOCATOR, alloc) //
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_RCVBUF, 1 << 17) //
.option(ChannelOption.SO_SNDBUF, 1 << 17) //
@@ -68,40 +84,72 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
@Override
protected void initChannel(SocketChannel ch) throws Exception {
-// logger.debug("initializing client connection.");
+ // logger.debug("initializing client connection.");
connection = initRemoteConnection(ch);
- ch.closeFuture().addListener(getCloseHandler(connection));
-
- ch.pipeline().addLast( //
- getDecoder(connection.getAllocator()), //
- new RpcDecoder("c-" + rpcConfig.getName()), //
- new RpcEncoder("c-" + rpcConfig.getName()), //
- new ClientHandshakeHandler(), //
- new InboundHandler(connection), //
- new RpcExceptionHandler() //
- );
+
+ ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+ final ChannelPipeline pipe = ch.pipeline();
+
+ pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
+ pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
+ pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
+ pipe.addLast("handshake-handler", new ClientHandshakeHandler());
+
+ if(pingHandler != null){
+ pipe.addLast("idle-state-handler", pingHandler);
+ }
+
+ pipe.addLast("message-handler", new InboundHandler(connection));
+ pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
}
}); //
-// if(TransportCheck.SUPPORTS_EPOLL){
-// b.option(EpollChannelOption.SO_REUSEPORT, true); //
-// }
+ // if(TransportCheck.SUPPORTS_EPOLL){
+ // b.option(EpollChannelOption.SO_REUSEPORT, true); //
+ // }
+ }
+
+ private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
+
+ /**
+ * Handler that watches for situations where we haven't read from the socket in a certain timeout. If we exceed this
+ * timeout, we send a PING message to the server to state that we are still alive.
+ */
+ private class IdlePingHandler extends IdleStateHandler {
+
+ private GenericFutureListener<Future<? super Void>> pingFailedHandler = new GenericFutureListener<Future<? super Void>>() {
+ public void operationComplete(Future<? super Void> future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Unable to maintain connection {}. Closing connection.", connection.getName());
+ connection.close();
+ }
+ }
+ };
+
+ public IdlePingHandler(long idleWaitInMillis) {
+ super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
+ if (evt.state() == IdleState.WRITER_IDLE) {
+ ctx.writeAndFlush(PING_MESSAGE).addListener(pingFailedHandler);
+ }
+ }
}
public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
- public boolean isActive(){
+ public boolean isActive() {
return connection != null
&& connection.getChannel() != null
- && connection.getChannel().isActive() ;
+ && connection.getChannel().isActive();
}
protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
- protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
- protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
- return new ChannelClosedHandler();
- }
+ protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
@@ -118,7 +166,8 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
return true;
}
- protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue, String host, int port){
+ protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue,
+ String host, int port) {
ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
b.connect(host, port).addListener(cml.connectionHandler);
}
@@ -145,16 +194,17 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
@Override
public void operationComplete(ChannelFuture future) throws Exception {
-// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
+ // logger.debug("Connection operation finished. Success: {}", future.isSuccess());
try {
future.get();
if (future.isSuccess()) {
- // send a handshake on the current thread. This is the only time we will send from within the event thread. We can do this because the connection will not be backed up.
+ // send a handshake on the current thread. This is the only time we will send from within the event thread.
+ // We can do this because the connection will not be backed up.
send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
} else {
l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
}
-// logger.debug("Handshake queued for send.");
+ // logger.debug("Handshake queued for send.");
} catch (Exception ex) {
l.connectionFailed(FailureType.CONNECTION, ex);
}
@@ -174,12 +224,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
@Override
public void success(HANDSHAKE_RESPONSE value, ByteBuf buffer) {
-// logger.debug("Handshake received. {}", value);
+ // logger.debug("Handshake received. {}", value);
try {
BasicClient.this.validateHandshake(value);
BasicClient.this.finalizeConnection(value, connection);
l.connectionSucceeded(connection);
-// logger.debug("Handshake completed succesfully.");
+ // logger.debug("Handshake completed succesfully.");
} catch (RpcException ex) {
l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
}
@@ -205,7 +255,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
- public void setAutoRead(boolean enableAutoRead){
+ public void setAutoRead(boolean enableAutoRead) {
connection.setAutoRead(enableAutoRead);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 08819ca..ab54fa1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -18,10 +18,8 @@
package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.channel.socket.SocketChannel;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
@@ -35,16 +33,13 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
private BufferAllocator alloc;
+ private final String connectionName;
public BasicClientWithConnection(RpcConfig rpcMapping, BufferAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
- Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+ Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser, String connectionName) {
super(rpcMapping, alloc.getUnderlyingAllocator(), eventLoopGroup, handshakeType, responseClass, handshakeParser);
this.alloc = alloc;
- }
-
- @Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(ServerConnection clientConnection) {
- return getCloseHandler(clientConnection.getChannel());
+ this.connectionName = connectionName;
}
@Override
@@ -55,16 +50,16 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
@Override
- public ServerConnection initRemoteConnection(Channel channel) {
- return new ServerConnection(channel, alloc);
+ public ServerConnection initRemoteConnection(SocketChannel channel) {
+ return new ServerConnection(connectionName, channel, alloc);
}
public static class ServerConnection extends RemoteConnection{
private final BufferAllocator alloc;
- public ServerConnection(Channel channel, BufferAllocator alloc) {
- super(channel);
+ public ServerConnection(String name, SocketChannel channel, BufferAllocator alloc) {
+ super(channel, name);
this.alloc = alloc;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 3a7032b..a148436 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -20,12 +20,13 @@ package org.apache.drill.exec.rpc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
import java.io.IOException;
import java.net.BindException;
@@ -44,40 +45,52 @@ import com.google.protobuf.Parser;
* requests will generate more than one outbound request.
*/
public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection> extends RpcBus<T, C> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicServer.class);
+ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+ protected static final String TIMEOUT_HANDLER = "timeout-handler";
private ServerBootstrap b;
private volatile boolean connect = false;
private final EventLoopGroup eventLoopGroup;
- public BasicServer(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ public BasicServer(final RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
super(rpcMapping);
this.eventLoopGroup = eventLoopGroup;
- b = new ServerBootstrap() //
- .channel(TransportCheck.getServerSocketChannel()) //
- .option(ChannelOption.SO_BACKLOG, 1000) //
+
+ b = new ServerBootstrap()
+ .channel(TransportCheck.getServerSocketChannel())
+ .option(ChannelOption.SO_BACKLOG, 1000)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.SO_RCVBUF, 1 << 17) //
- .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+ .option(ChannelOption.SO_RCVBUF, 1 << 17)
+ .option(ChannelOption.SO_SNDBUF, 1 << 17)
.group(eventLoopGroup) //
- .childOption(ChannelOption.ALLOCATOR, alloc) //
-// .handler(new LoggingHandler(LogLevel.INFO)) //
+ .childOption(ChannelOption.ALLOCATOR, alloc)
+
+ // .handler(new LoggingHandler(LogLevel.INFO))
+
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// logger.debug("Starting initialization of server connection.");
C connection = initRemoteConnection(ch);
- ch.closeFuture().addListener(getCloseHandler(connection));
-
- ch.pipeline().addLast( //
- getDecoder(connection.getAllocator(), getOutOfMemoryHandler()), //
- new RpcDecoder("s-" + rpcConfig.getName()), //
- new RpcEncoder("s-" + rpcConfig.getName()), //
- getHandshakeHandler(connection), new InboundHandler(connection), //
- new RpcExceptionHandler() //
- );
+ ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+ final ChannelPipeline pipe = ch.pipeline();
+ pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator(), getOutOfMemoryHandler()));
+ pipe.addLast("message-decoder", new RpcDecoder("s-" + rpcConfig.getName()));
+ pipe.addLast("protocol-encoder", new RpcEncoder("s-" + rpcConfig.getName()));
+ pipe.addLast("handshake-handler", getHandshakeHandler(connection));
+
+ if (rpcMapping.hasTimeout()) {
+ pipe.addLast(TIMEOUT_HANDLER,
+ new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout()));
+ }
+
+ pipe.addLast("message-handler", new InboundHandler(connection));
+ pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+
connect = true;
// logger.debug("Server connection initialization completed.");
}
@@ -88,10 +101,33 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
// }
}
+ private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+
+ private final String name;
+ private final int timeoutSeconds;
+ public LogggingReadTimeoutHandler(String name, int timeoutSeconds) {
+ super(timeoutSeconds);
+ this.name = name;
+ this.timeoutSeconds = timeoutSeconds;
+ }
+
+ @Override
+ protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
+ logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", name,
+ timeoutSeconds);
+ super.readTimedOut(ctx);
+ }
+
+ }
+
public OutOfMemoryHandler getOutOfMemoryHandler() {
return OutOfMemoryHandler.DEFAULT_INSTANCE;
}
+ protected void removeTimeoutHandler() {
+
+ }
+
public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
@Override
@@ -141,7 +177,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
}
@Override
- public C initRemoteConnection(Channel channel) {
+ public C initRemoteConnection(SocketChannel channel) {
return null;
}
@@ -152,7 +188,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
b.bind(++port).sync();
break;
} catch (Exception e) {
- if (e instanceof BindException && allowPortHunting){
+ if (e instanceof BindException && allowPortHunting) {
continue;
}
throw new DrillbitStartupException("Could not bind Drillbit", e);
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 1bb65d3..5a5bbab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -86,7 +86,7 @@ public class CoordinationQueue {
if (future.channel().isActive()) {
throw new RpcException("Future failed") ;
} else {
- throw new ChannelClosedException();
+ setException(new ChannelClosedException());
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index a72dd32..0f095c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.rpc;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ExecutionException;
@@ -30,16 +33,31 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
private final Channel channel;
private final WriteManager writeManager;
+ private final String name;
public boolean inEventLoop(){
return channel.eventLoop().inEventLoop();
}
- public RemoteConnection(Channel channel) {
+ public RemoteConnection(SocketChannel channel, String name) {
super();
this.channel = channel;
+ this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name);
this.writeManager = new WriteManager();
channel.pipeline().addLast(new BackPressureHandler());
+ channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
+ public void operationComplete(Future<? super Void> future) throws Exception {
+ // this could possibly overrelease but it doesn't matter since we're only going to do this to ensure that we
+ // fail out any pending messages
+ writeManager.disable();
+ writeManager.setWritable(true);
+ }
+ });
+
+ }
+
+ public String getName() {
+ return name;
}
public abstract BufferAllocator getAllocator();
@@ -72,6 +90,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
*/
private static class WriteManager{
private final ResettableBarrier barrier = new ResettableBarrier();
+ private volatile boolean disabled = false;
public WriteManager(){
barrier.openBarrier();
@@ -82,15 +101,17 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
}
public void setWritable(boolean isWritable){
-// logger.debug("Set writable: {}", isWritable);
if(isWritable){
barrier.openBarrier();
- }else{
+ } else if (!disabled) {
barrier.closeBarrier();
}
}
+ public void disable() {
+ disabled = true;
+ }
}
private class BackPressureHandler extends ChannelInboundHandlerAdapter{
@@ -107,7 +128,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
@Override
public void close() {
try {
- channel.close().get();
+ if (channel.isActive()) {
+ channel.close().get();
+ }
} catch (InterruptedException | ExecutionException e) {
logger.warn("Caught exception while closing channel.", e);
// TODO InterruptedException
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index b165b53..92ce312 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -23,10 +23,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
+import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
@@ -125,25 +127,36 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
}
- public abstract C initRemoteConnection(Channel channel);
+ public abstract C initRemoteConnection(SocketChannel channel);
public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
+
+ final InetSocketAddress local;
+ final InetSocketAddress remote;
+ final C clientConnection;
+
+ public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) {
+ this.local = local;
+ this.remote = remote;
+ this.clientConnection = clientConnection;
+ }
+
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel()
- .remoteAddress());
- closeQueueDueToChannelClose();
- }
- }
+ String msg = String.format("Channel closed %s <--> %s.", local, remote);
+ if (RpcBus.this.isClient()) {
+ logger.info(String.format(msg));
+ } else {
+ queue.channelClosed(new ChannelClosedException(msg));
+ }
- protected void closeQueueDueToChannelClose() {
- if (this.isClient()) {
- queue.channelClosed(new ChannelClosedException("Queue closed due to channel closure."));
+ clientConnection.close();
}
+
}
- protected GenericFutureListener<ChannelFuture> getCloseHandler(C clientConnection) {
- return new ChannelClosedHandler();
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
+ return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress());
}
private class ResponseSenderImpl implements ResponseSender {
@@ -170,8 +183,11 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
+ private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK);
+
protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
+
private final C connection;
public InboundHandler(C connection) {
super();
@@ -179,67 +195,84 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
@Override
- protected void decode(ChannelHandlerContext ctx, InboundRpcMessage msg, List<Object> output) throws Exception {
+ protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception {
if (!ctx.channel().isOpen()) {
return;
}
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Received message {}", msg);
}
- switch (msg.mode) {
- case REQUEST: {
- // handle message and ack.
- ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
- try {
- handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
- } catch(UserRpcException e){
- UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build();
-
- logger.error("Unexpected Error while handling request message", e);
-
- OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE_FAILURE, 0, msg.coordinationId,
- uex.getOrCreatePBError(false));
- if (RpcConstants.EXTRA_DEBUGGING) {
- logger.debug("Adding message to outbound buffer. {}", outMessage);
+ final Channel channel = connection.getChannel();
+
+ try{
+ switch (msg.mode) {
+ case REQUEST: {
+ // handle message and ack.
+
+ try {
+ ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
+ handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
+ } catch (UserRpcException e) {
+ UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build();
+
+ logger.error("Unexpected Error while handling request message", e);
+
+ OutboundRpcMessage outMessage = new OutboundRpcMessage(
+ RpcMode.RESPONSE_FAILURE,
+ 0,
+ msg.coordinationId,
+ uex.getOrCreatePBError(false)
+ );
+
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Adding message to outbound buffer. {}", outMessage);
+ }
+
+ channel.writeAndFlush(outMessage);
}
- connection.getChannel().writeAndFlush(outMessage);
+ break;
}
- msg.release(); // we release our ownership. Handle could have taken over ownership.
- break;
- }
- case RESPONSE:
- try{
- MessageLite m = getResponseDefaultInstance(msg.rpcType);
- assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
- RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
- Parser<?> parser = m.getParserForType();
- Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
- rpcFuture.set(value, msg.dBody);
- msg.release(); // we release our ownership. Handle could have taken over ownership.
- if (RpcConstants.EXTRA_DEBUGGING) {
- logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
- }
- }catch(Exception ex) {
- logger.error("Failure while handling response.", ex);
- throw ex;
- }
- break;
+ case RESPONSE:
+ try {
+ MessageLite m = getResponseDefaultInstance(msg.rpcType);
+ assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
+ RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
+ Parser<?> parser = m.getParserForType();
+ Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+ rpcFuture.set(value, msg.dBody);
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+ }
+ } catch (Exception ex) {
+ logger.error("Failure while handling response.", ex);
+ throw ex;
+ }
+ break;
- case RESPONSE_FAILURE:
- DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
- queue.updateFailedFuture(msg.coordinationId, failure);
- msg.release();
- if (RpcConstants.EXTRA_DEBUGGING) {
- logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
- }
- break;
+ case RESPONSE_FAILURE:
+ DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+ queue.updateFailedFuture(msg.coordinationId, failure);
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
+ }
+ break;
+
+ case PING:
+ connection.getChannel().writeAndFlush(PONG);
+ break;
+
+ case PONG:
+ // noop.
+ break;
- default:
- throw new UnsupportedOperationException();
+ default:
+ throw new UnsupportedOperationException();
+ }
+ } finally {
+ msg.release();
}
}
-
}
public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
index b5974f6..ab6c375 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
import java.util.Map;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.protobuf.Internal.EnumLite;
@@ -28,11 +29,14 @@ public class RpcConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConfig.class);
private final String name;
+ private final int timeout;
private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
- private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap) {
+ private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap,
+ Map<Integer, RpcMessageType<?, ?, ?>> receiveMap, int timeout) {
this.name = name;
+ this.timeout = timeout;
this.sendMap = ImmutableMap.copyOf(sendMap);
this.receiveMap = ImmutableMap.copyOf(receiveMap);
}
@@ -41,6 +45,13 @@ public class RpcConfig {
return name;
}
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public boolean hasTimeout() {
+ return timeout > 0;
+ }
public boolean checkReceive(int rpcType, Class<?> receiveClass) {
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
@@ -134,17 +145,27 @@ public class RpcConfig {
}
- public static RpcConfigBuilder newBuilder(String name) {
- return new RpcConfigBuilder(name);
+ public static RpcConfigBuilder newBuilder() {
+ return new RpcConfigBuilder();
}
public static class RpcConfigBuilder {
- private final String name;
+ private String name;
+ private int timeout = -1;
private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();
- private RpcConfigBuilder(String name) {
+ private RpcConfigBuilder() {
+ }
+
+ public RpcConfigBuilder name(String name) {
this.name = name;
+ return this;
+ }
+
+ public RpcConfigBuilder timeout(int timeoutInSeconds) {
+ this.timeout = timeoutInSeconds;
+ return this;
}
public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite> RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec) {
@@ -155,7 +176,9 @@ public class RpcConfig {
}
public RpcConfig build() {
- return new RpcConfig(name, sendMap, receiveMap);
+ Preconditions.checkArgument(timeout > -1, "Timeout must be a positive number or zero for disabled.");
+ Preconditions.checkArgument(name != null, "RpcConfig name must be set.");
+ return new RpcConfig(name, sendMap, receiveMap, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index 537452e..c12ff7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -23,23 +23,23 @@ import io.netty.channel.ChannelHandlerContext;
public class RpcExceptionHandler implements ChannelHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
- public RpcExceptionHandler(){
- }
+ private final String name;
+ public RpcExceptionHandler(String name) {
+ this.name = name;
+ }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
- logger.warn("Exception with closed channel", cause);
+ logger.warn("Exception occurred with closed channel. Connection: {}", name, cause);
return;
}else{
- logger.error("Exception in pipeline. Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
+ logger.error("Exception in RPC communication. Connection: {}. Closing connection.", name, cause);
ctx.close();
}
}
-
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index d546db3..f191271 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -18,8 +18,8 @@
package org.apache.drill.exec.rpc.control;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -50,7 +50,8 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
private final BufferAllocator allocator;
public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler, BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
- super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
+ super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+ .getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
this.localIdentity = localEndpoint;
this.remoteEndpoint = remoteEndpoint;
this.handler = handler;
@@ -64,14 +65,15 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
@SuppressWarnings("unchecked")
@Override
- public ControlConnection initRemoteConnection(Channel channel) {
- this.connection = new ControlConnection(channel, (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
+ public ControlConnection initRemoteConnection(SocketChannel channel) {
+ this.connection = new ControlConnection("control client", channel,
+ (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
return connection;
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(ControlConnection clientConnection) {
- return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection clientConnection) {
+ return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index a7aaa9c..49f0f01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.rpc.control;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
import java.util.UUID;
@@ -41,8 +41,9 @@ public class ControlConnection extends RemoteConnection {
private volatile boolean active = false;
private final UUID id;
- public ControlConnection(Channel channel, RpcBus<RpcType, ControlConnection> bus, BufferAllocator allocator) {
- super(channel);
+ public ControlConnection(String name, SocketChannel channel, RpcBus<RpcType, ControlConnection> bus,
+ BufferAllocator allocator) {
+ super(channel, name);
this.bus = bus;
this.id = UUID.randomUUID();
this.allocator = allocator;
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 37730e3..f92bb49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -17,7 +17,8 @@
*/
package org.apache.drill.exec.rpc.control;
-
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -34,15 +35,19 @@ import org.apache.drill.exec.rpc.RpcConfig;
public class ControlRpcConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlRpcConfig.class);
- public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-CONTROL-RPC-MAPPING") //
- .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
- .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
- .build();
+ public static RpcConfig getMapping(DrillConfig config) {
+ return RpcConfig.newBuilder()
+ .name("CONTROL")
+ .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
+ .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
+ .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
+ .build();
+ }
public static int RPC_VERSION = 3;
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 43089d3..5e405ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -18,8 +18,8 @@
package org.apache.drill.exec.rpc.control;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -44,7 +44,8 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
private BufferAllocator allocator;
public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
- super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+ super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+ .getBitLoopGroup());
this.handler = handler;
this.connectionRegistry = connectionRegistry;
this.allocator = context.getAllocator();
@@ -61,14 +62,14 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(ControlConnection connection) {
- this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection connection) {
+ this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
return proxyCloseHandler;
}
@Override
- public ControlConnection initRemoteConnection(Channel channel) {
- return new ControlConnection(channel, this, allocator);
+ public ControlConnection initRemoteConnection(SocketChannel channel) {
+ return new ControlConnection("control server", channel, this, allocator);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
index 1d539a2..44c8ddd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.rpc.data;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.RemoteConnection;
@@ -26,8 +26,9 @@ public class BitServerConnection extends RemoteConnection{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class);
private final BufferAllocator allocator;
- public BitServerConnection(Channel channel, BufferAllocator allocator) {
- super(channel);
+
+ public BitServerConnection(SocketChannel channel, BufferAllocator allocator) {
+ super(channel, "data server");
this.allocator = allocator;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index 8e2507b..b8a07c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -18,8 +18,8 @@
package org.apache.drill.exec.rpc.data;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -47,21 +47,22 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
- super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
+ super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+ .getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
this.remoteEndpoint = remoteEndpoint;
this.closeHandlerFactory = closeHandlerFactory;
this.allocator = context.getAllocator();
}
@Override
- public DataClientConnection initRemoteConnection(Channel channel) {
+ public DataClientConnection initRemoteConnection(SocketChannel channel) {
this.connection = new DataClientConnection(channel, this);
return connection;
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(DataClientConnection clientConnection) {
- return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataClientConnection clientConnection) {
+ return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
index 3a569db..eb5778d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.rpc.data;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
import java.util.UUID;
@@ -36,8 +36,8 @@ public class DataClientConnection extends RemoteConnection{
private final DataClient client;
private final UUID id;
- public DataClientConnection(Channel channel, DataClient client) {
- super(channel);
+ public DataClientConnection(SocketChannel channel, DataClient client) {
+ super(channel, "data client");
this.client = client;
// we use a local listener pool unless a global one is provided.
this.id = UUID.randomUUID();
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index 807b6c3..c5cf498 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -17,7 +17,8 @@
*/
package org.apache.drill.exec.rpc.data;
-
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.BitData.BitServerHandshake;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
@@ -30,10 +31,14 @@ import org.apache.drill.exec.rpc.RpcConfig;
public class DataRpcConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataRpcConfig.class);
- public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-DATA-RPC-MAPPING") //
- .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
- .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
- .build();
+ public static RpcConfig getMapping(DrillConfig config) {
+ return RpcConfig.newBuilder()
+ .name("DATA")
+ .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
+ .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
+ .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
+ .build();
+ }
public static int RPC_VERSION = 4;
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 6f8e20b..0d4077e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.rpc.data;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
@@ -57,7 +57,8 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
private final DataResponseHandler dataHandler;
public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
- super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+ super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+ .getBitLoopGroup());
this.context = context;
this.workBus = workBus;
this.dataHandler = dataHandler;
@@ -69,13 +70,13 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(BitServerConnection connection) {
- this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, BitServerConnection connection) {
+ this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
return proxyCloseHandler;
}
@Override
- public BitServerConnection initRemoteConnection(Channel channel) {
+ public BitServerConnection initRemoteConnection(SocketChannel channel) {
return new BitServerConnection(channel, context.getAllocator());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 5e3e937..302be72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -19,18 +19,22 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
@@ -63,8 +67,9 @@ public class QueryResultHandler {
private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
Maps.newConcurrentMap();
- public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
- return new SubmissionListener(resultsListener);
+ public RpcOutcomeListener<QueryId> getWrappedListener(RemoteConnection connection,
+ UserResultsListener resultsListener) {
+ return new SubmissionListener(connection, resultsListener);
}
/**
@@ -268,12 +273,31 @@ public class QueryResultHandler {
}
+
private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
- private UserResultsListener resultsListener;
+ private final UserResultsListener resultsListener;
+ private final RemoteConnection connection;
+ private final ChannelFuture closeFuture;
+ private final ChannelClosedListener closeListener;
- public SubmissionListener(UserResultsListener resultsListener) {
+ public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener) {
super();
this.resultsListener = resultsListener;
+ this.connection = connection;
+ this.closeFuture = connection.getChannel().closeFuture();
+ this.closeListener = new ChannelClosedListener();
+ closeFuture.addListener(closeListener);
+ }
+
+ private class ChannelClosedListener implements GenericFutureListener<Future<Void>> {
+
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ resultsListener.submissionFailed(UserException.connectionError()
+ .message("Connection %s closed unexpectedly.", connection.getName())
+ .build());
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index a8bad78..b39a103 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack.Builder;
import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -51,13 +51,21 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
private boolean supportComplexTypes = true;
- public UserClient(boolean supportComplexTypes, BufferAllocator alloc, EventLoopGroup eventLoopGroup) {
- super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
+ public UserClient(DrillConfig config, boolean supportComplexTypes, BufferAllocator alloc,
+ EventLoopGroup eventLoopGroup) {
+ super(
+ UserRpcConfig.getMapping(config),
+ alloc,
+ eventLoopGroup,
+ RpcType.HANDSHAKE,
+ BitToUserHandshake.class,
+ BitToUserHandshake.PARSER,
+ "user client");
this.supportComplexTypes = supportComplexTypes;
}
public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
- send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
+ send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}
public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials)
@@ -66,6 +74,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
.setSupportComplexTypes(supportComplexTypes)
+ .setSupportTimeout(true)
.setCredentials(credentials);
if (props != null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 88592d4..ae728d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -17,10 +17,12 @@
*/
package org.apache.drill.exec.rpc.user;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -30,13 +32,17 @@ import org.apache.drill.exec.rpc.RpcConfig;
public class UserRpcConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class);
- public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") //
- .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
- .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
- .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit
- .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) //bit to user
- .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
- .build();
+ public static RpcConfig getMapping(DrillConfig config) {
+ return RpcConfig.newBuilder()
+ .name("USER")
+ .timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
+ .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit.
+ .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
+ .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
+ .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
+ .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
+ .build();
+ }
public static int RPC_VERSION = 5;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 9e929de..b3b7ae9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -17,12 +17,11 @@
*/
package org.apache.drill.exec.rpc.user;
-import com.google.common.io.Closeables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.util.UUID;
@@ -56,6 +55,7 @@ import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
import org.apache.drill.exec.rpc.user.security.UserAuthenticatorFactory;
import org.apache.drill.exec.work.user.UserWorker;
+import com.google.common.io.Closeables;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
@@ -68,7 +68,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public UserServer(DrillConfig config, BufferAllocator alloc, EventLoopGroup eventLoopGroup,
UserWorker worker) throws DrillbitStartupException {
- super(UserRpcConfig.MAPPING, alloc.getUnderlyingAllocator(), eventLoopGroup);
+ super(UserRpcConfig.getMapping(config), alloc.getUnderlyingAllocator(), eventLoopGroup);
this.worker = worker;
this.alloc = alloc;
if (config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)) {
@@ -123,8 +123,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
private UserSession session;
- public UserClientConnection(Channel channel) {
- super(channel);
+ public UserClientConnection(SocketChannel channel) {
+ super(channel, "user client");
+ }
+
+ void disableReadTimeout() {
+ getChannel().pipeline().remove(BasicServer.TIMEOUT_HANDLER);
}
void setUser(UserToBitHandshake inbound) throws IOException {
@@ -161,7 +165,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
@Override
- public UserClientConnection initRemoteConnection(Channel channel) {
+ public UserClientConnection initRemoteConnection(SocketChannel channel) {
return new UserClientConnection(channel);
}
@@ -186,6 +190,13 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public BitToUserHandshake getHandshakeResponse(UserToBitHandshake inbound) throws Exception {
logger.trace("Handling handshake from user to bit. {}", inbound);
+
+ // if timeout is unsupported or is set to false, disable timeout.
+ if (!inbound.hasSupportTimeout() || !inbound.getSupportTimeout()) {
+ connection.disableReadTimeout();
+ logger.warn("Timeout Disabled as client doesn't support it.", connection.getName());
+ }
+
BitToUserHandshake.Builder respBuilder = BitToUserHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION);
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 49d0c94..b7ef584 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -18,6 +18,9 @@
package org.apache.drill.exec.work.foreman;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.Collection;
@@ -121,6 +124,9 @@ public class Foreman implements Runnable {
private final ResponseSendListener responseListener = new ResponseSendListener();
private final StateSwitch stateSwitch = new StateSwitch();
private final ForemanResult foremanResult = new ForemanResult();
+ private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
+ private final ChannelFuture closeFuture;
+
/**
* Constructor. Sets up the Foreman, but does not initiate any execution.
@@ -139,6 +145,9 @@ public class Foreman implements Runnable {
this.drillbitContext = drillbitContext;
initiatingClient = connection;
+ this.closeFuture = initiatingClient.getChannel().closeFuture();
+ closeFuture.addListener(closeListener);
+
queryContext = new QueryContext(connection.getSession(), drillbitContext);
queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
@@ -146,6 +155,13 @@ public class Foreman implements Runnable {
recordNewState(QueryState.PENDING);
}
+ private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ cancel();
+ }
+ }
+
/**
* Get the QueryContext created for the query.
*
@@ -603,6 +619,9 @@ public class Foreman implements Runnable {
logger.info("foreman cleaning up.");
injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
+ // remove the channel disconnected listener (doesn't throw)
+ closeFuture.removeListener(closeListener);
+
// These are straight forward removals from maps, so they won't throw.
drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8006533..d98b97a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -27,6 +27,7 @@ drill.exec: {
cluster-id: "drillbits1"
rpc: {
user: {
+ timeout: 30,
server: {
port: 31010
threads: 1
@@ -36,6 +37,7 @@ drill.exec: {
}
},
bit: {
+ timeout: 30,
server: {
port : 31011,
retry:{
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
index f47e719..c28cc29 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
@@ -42,6 +42,14 @@ public final class GeneralRPCProtos {
* <code>RESPONSE_FAILURE = 2;</code>
*/
RESPONSE_FAILURE(2, 2),
+ /**
+ * <code>PING = 3;</code>
+ */
+ PING(3, 3),
+ /**
+ * <code>PONG = 4;</code>
+ */
+ PONG(4, 4),
;
/**
@@ -56,6 +64,14 @@ public final class GeneralRPCProtos {
* <code>RESPONSE_FAILURE = 2;</code>
*/
public static final int RESPONSE_FAILURE_VALUE = 2;
+ /**
+ * <code>PING = 3;</code>
+ */
+ public static final int PING_VALUE = 3;
+ /**
+ * <code>PONG = 4;</code>
+ */
+ public static final int PONG_VALUE = 4;
public final int getNumber() { return value; }
@@ -65,6 +81,8 @@ public final class GeneralRPCProtos {
case 0: return REQUEST;
case 1: return RESPONSE;
case 2: return RESPONSE_FAILURE;
+ case 3: return PING;
+ case 4: return PONG;
default: return null;
}
}
@@ -1972,10 +1990,10 @@ public final class GeneralRPCProtos {
"rdination_id\030\002 \001(\005\022\020\n\010rpc_type\030\003 \001(\005\"b\n\022" +
"CompleteRpcMessage\022#\n\006header\030\001 \001(\0132\023.exe" +
"c.rpc.RpcHeader\022\025\n\rprotobuf_body\030\002 \001(\014\022\020" +
- "\n\010raw_body\030\003 \001(\014*:\n\007RpcMode\022\013\n\007REQUEST\020\000" +
- "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002B1\n\033" +
- "org.apache.drill.exec.protoB\020GeneralRPCP" +
- "rotosH\001"
+ "\n\010raw_body\030\003 \001(\014*N\n\007RpcMode\022\013\n\007REQUEST\020\000" +
+ "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002\022\010\n\004" +
+ "PING\020\003\022\010\n\004PONG\020\004B1\n\033org.apache.drill.exe" +
+ "c.protoB\020GeneralRPCProtosH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
index d587dfc..6fc43bb 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
@@ -280,6 +280,8 @@ public final class SchemaUserProtos
if(message.hasSupportComplexTypes())
output.writeBool(6, message.getSupportComplexTypes(), false);
+ if(message.hasSupportTimeout())
+ output.writeBool(7, message.getSupportTimeout(), false);
}
public boolean isInitialized(org.apache.drill.exec.proto.UserProtos.UserToBitHandshake message)
{
@@ -339,6 +341,9 @@ public final class SchemaUserProtos
case 6:
builder.setSupportComplexTypes(input.readBool());
break;
+ case 7:
+ builder.setSupportTimeout(input.readBool());
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -385,6 +390,7 @@ public final class SchemaUserProtos
case 4: return "credentials";
case 5: return "properties";
case 6: return "supportComplexTypes";
+ case 7: return "supportTimeout";
default: return null;
}
}
@@ -402,6 +408,7 @@ public final class SchemaUserProtos
fieldMap.put("credentials", 4);
fieldMap.put("properties", 5);
fieldMap.put("supportComplexTypes", 6);
+ fieldMap.put("supportTimeout", 7);
}
}
[3/3] drill git commit: DRILL-2897: Avoid parallelization when Drill
is running limit 0 query.
Posted by ja...@apache.org.
DRILL-2897: Avoid parallelization when Drill is running limit 0 query.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9d209622
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9d209622
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9d209622
Branch: refs/heads/master
Commit: 9d209622934cb2be929a1591fd6a059165e1ac39
Parents: 960f876
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Apr 26 22:35:57 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 9 08:03:59 2015 -0700
----------------------------------------------------------------------
.../exec/planner/physical/PlannerSettings.java | 7 +-
.../planner/sql/handlers/DefaultSqlHandler.java | 57 +++----
.../planner/sql/handlers/FindLimit0Visitor.java | 119 +++++++++++++++
.../org/apache/drill/TestExampleQueries.java | 1 +
.../java/org/apache/drill/TestTpchLimit0.java | 149 +++++++++++++++++++
5 files changed, 305 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 7d8dd97..d4fb4f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -33,6 +33,7 @@ public class PlannerSettings implements Context{
private int numEndPoints = 0;
private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
+ private boolean forceSingleMode;
public static final int MAX_BROADCAST_THRESHOLD = Integer.MAX_VALUE;
public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 1024;
@@ -81,7 +82,11 @@ public class PlannerSettings implements Context{
}
public boolean isSingleMode() {
- return options.getOption(EXCHANGE.getOptionName()).bool_val;
+ return forceSingleMode || options.getOption(EXCHANGE.getOptionName()).bool_val;
+ }
+
+ public void forceSingleMode() {
+ forceSingleMode = true;
}
public int numEndPoints() {
http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index eda1b5f..188aaa9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -22,19 +22,23 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.rules.ProjectRemoveRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.TypedSqlNode;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
-
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
@@ -55,7 +59,6 @@ import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
@@ -77,13 +80,6 @@ import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.hep.HepPlanner;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlNode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
@@ -215,23 +211,23 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
return rel;
}
- protected RelNode preprocessNode(RelNode rel) throws SqlUnsupportedException{
- /* Traverse the tree to do the following pre-processing tasks:
- * 1. replace the convert_from, convert_to function to actual implementations
- * Eg: convert_from(EXPR, 'JSON') be converted to convert_fromjson(EXPR);
- * TODO: Ideally all function rewrites would move here instead of DrillOptiq
+ protected RelNode preprocessNode(RelNode rel) throws SqlUnsupportedException {
+ /*
+ * Traverse the tree to do the following pre-processing tasks: 1. replace the convert_from, convert_to function to
+ * actual implementations Eg: convert_from(EXPR, 'JSON') be converted to convert_fromjson(EXPR); TODO: Ideally all
+ * function rewrites would move here instead of DrillOptiq.
*
- * 2. see where the tree contains unsupported functions;
- * throw SqlUnsupportedException if there is
+ * 2. see where the tree contains unsupported functions; throw SqlUnsupportedException if there is any.
*/
- PreProcessLogicalRel visitor = PreProcessLogicalRel.createVisitor(planner.getTypeFactory(), context.getDrillOperatorTable());
- try {
- rel = rel.accept(visitor);
- } catch(UnsupportedOperationException ex) {
- visitor.convertException();
- throw ex;
- }
+ PreProcessLogicalRel visitor = PreProcessLogicalRel.createVisitor(planner.getTypeFactory(),
+ context.getDrillOperatorTable());
+ try {
+ rel = rel.accept(visitor);
+ } catch (UnsupportedOperationException ex) {
+ visitor.convertException();
+ throw ex;
+ }
return rel;
}
@@ -243,9 +239,16 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
if (convertedRelNode instanceof DrillStoreRel) {
throw new UnsupportedOperationException();
} else {
+
+ // If the query contains a limit 0 clause, disable distributed mode since it is overkill for determining schema.
+ if (FindLimit0Visitor.containsLimit0(convertedRelNode)) {
+ context.getPlannerSettings().forceSingleMode();
+ }
+
// Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
- DrillRel topPreservedNameProj = addRenamedProject((DrillRel)convertedRelNode, validatedRowType);
- return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(), topPreservedNameProj);
+ DrillRel topPreservedNameProj = addRenamedProject((DrillRel) convertedRelNode, validatedRowType);
+ return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(),
+ topPreservedNameProj);
}
} catch (RelOptPlanner.CannotPlanException ex) {
logger.error(ex.getMessage());
http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
new file mode 100644
index 0000000..d2c5fa6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.planner.logical.DrillLimitRel;
+
+/**
+ * Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we
+ * inform the planner settings that this plan should be run as a single node plan to reduce the overhead associated with
+ * executing a schema-only query.
+ */
+public class FindLimit0Visitor extends RelShuttleImpl {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FindLimit0Visitor.class);
+
+ private boolean contains = false;
+
+ public static boolean containsLimit0(RelNode rel) {
+ FindLimit0Visitor visitor = new FindLimit0Visitor();
+ rel.accept(visitor);
+ return visitor.isContains();
+ }
+
+ private FindLimit0Visitor() {
+ }
+
+ boolean isContains() {
+ return contains;
+ }
+
+ private boolean isLimit0(RexNode fetch) {
+ if (fetch != null && fetch.isA(SqlKind.LITERAL)) {
+ RexLiteral l = (RexLiteral) fetch;
+ switch (l.getTypeName()) {
+ case BIGINT:
+ case INTEGER:
+ case DECIMAL:
+ if (((long) l.getValue2()) == 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public RelNode visit(LogicalSort sort) {
+ if (isLimit0(sort.fetch)) {
+ contains = true;
+ return sort;
+ }
+
+ return super.visit(sort);
+ }
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof DrillLimitRel) {
+ if (isLimit0(((DrillLimitRel) other).getFetch())) {
+ contains = true;
+ return other;
+ }
+ }
+
+ return super.visit(other);
+ }
+
+ // The following set of RelNodes should terminate a search for the limit 0 pattern as they want convey its meaning.
+
+ @Override
+ public RelNode visit(LogicalAggregate aggregate) {
+ return aggregate;
+ }
+
+ @Override
+ public RelNode visit(LogicalIntersect intersect) {
+ return intersect;
+ }
+
+ @Override
+ public RelNode visit(LogicalJoin join) {
+ return join;
+ }
+
+ @Override
+ public RelNode visit(LogicalMinus minus) {
+ return minus;
+ }
+
+ @Override
+ public RelNode visit(LogicalUnion union) {
+ return union;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 7e07500..18336cf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -284,6 +284,7 @@ public class TestExampleQueries extends BaseTestQuery{
}
@Test
+ @Ignore("DRILL-3004")
public void testJoin() throws Exception{
test("alter session set `planner.enable_hashjoin` = false");
test("SELECT\n" +
http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
new file mode 100644
index 0000000..22471c8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestTpchLimit0 extends BaseTestQuery{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchLimit0.class);
+
+ private void testLimitZero(String fileName) throws Exception {
+ String query = getFile(fileName);
+ query = "ALTER SESSION SET `planner.slice_target` = 1; select * from \n(" + query.replace(";", ")xyz limit 0;");
+ test(query);
+ }
+
+ @Test
+ public void tpch01() throws Exception{
+ testLimitZero("queries/tpch/01.sql");
+ }
+
+ @Test
+ @Ignore // DRILL-512
+ public void tpch02() throws Exception{
+ testLimitZero("queries/tpch/02.sql");
+ }
+
+ @Test
+ public void tpch03() throws Exception{
+ testLimitZero("queries/tpch/03.sql");
+ }
+
+ @Test
+ public void tpch04() throws Exception{
+ testLimitZero("queries/tpch/04.sql");
+ }
+
+ @Test
+ public void tpch05() throws Exception{
+ testLimitZero("queries/tpch/05.sql");
+ }
+
+ @Test
+ public void tpch06() throws Exception{
+ testLimitZero("queries/tpch/06.sql");
+ }
+
+ @Test
+ public void tpch07() throws Exception{
+ testLimitZero("queries/tpch/07.sql");
+ }
+
+ @Test
+ public void tpch08() throws Exception{
+ testLimitZero("queries/tpch/08.sql");
+ }
+
+ @Test
+ public void tpch09() throws Exception{
+ testLimitZero("queries/tpch/09.sql");
+ }
+
+ @Test
+ public void tpch10() throws Exception{
+ testLimitZero("queries/tpch/10.sql");
+ }
+
+ @Test
+ @Ignore // Cartesian problem
+ public void tpch11() throws Exception{
+ testLimitZero("queries/tpch/11.sql");
+ }
+
+ @Test
+ public void tpch12() throws Exception{
+ testLimitZero("queries/tpch/12.sql");
+ }
+
+ @Test
+ public void tpch13() throws Exception{
+ testLimitZero("queries/tpch/13.sql");
+ }
+
+ @Test
+ public void tpch14() throws Exception{
+ testLimitZero("queries/tpch/14.sql");
+ }
+
+ @Test
+ @Ignore //
+ public void tpch15() throws Exception{
+ testLimitZero("queries/tpch/15.sql");
+ }
+
+ @Test
+ @Ignore // invalid plan, due to Nulls value NOT IN sub-q
+ public void tpch16() throws Exception{
+ testLimitZero("queries/tpch/16.sql");
+ }
+
+ @Test
+ @Ignore //
+ public void tpch17() throws Exception{
+ testLimitZero("queries/tpch/17.sql");
+ }
+
+ @Test
+ public void tpch18() throws Exception{
+ testLimitZero("queries/tpch/18.sql");
+ }
+
+ @Test
+ @Ignore // DRILL-519
+ public void tpch19() throws Exception{
+ testLimitZero("queries/tpch/19.sql");
+ }
+
+ @Test
+ public void tpch20() throws Exception{
+ testLimitZero("queries/tpch/20.sql");
+ }
+
+ @Test
+ @Ignore
+ public void tpch21() throws Exception{
+ testLimitZero("queries/tpch/21.sql");
+ }
+
+ @Test
+ @Ignore // DRILL-518
+ public void tpch22() throws Exception{
+ testLimitZero("queries/tpch/22.sql");
+ }
+}