You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/25 01:22:37 UTC
[02/11] drill git commit: DRILL-2811: Allow direct connection to
drillbit from DrillClient
DRILL-2811: Allow direct connection to drillbit from DrillClient
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a6256b17
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a6256b17
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a6256b17
Branch: refs/heads/master
Commit: a6256b175e1a75fbc323babac29c52401d975842
Parents: 25ea3e8
Author: Parth Chandra <pc...@maprtech.com>
Authored: Tue Apr 21 22:18:14 2015 -0700
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Fri Apr 24 16:21:54 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/client/DrillClient.java | 73 ++++++++++++++------
.../drill/jdbc/DrillConnectionConfig.java | 5 ++
.../apache/drill/jdbc/DrillConnectionImpl.java | 5 ++
3 files changed, 61 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/a6256b17/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 0d29f60..ae0f580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -82,25 +82,45 @@ public class DrillClient implements Closeable, ConnectionThrottle {
private boolean supportComplexTypes;
private final boolean ownsZkConnection;
private final boolean ownsAllocator;
+ private final boolean isDirectConnection; // true if the connection bypasses zookeeper and connects directly to a drillbit
private EventLoopGroup eventLoopGroup;
public DrillClient() {
- this(DrillConfig.create());
+ this(DrillConfig.create(), false);
+ }
+
+ public DrillClient(boolean isDirect) {
+ this(DrillConfig.create(), isDirect);
}
public DrillClient(String fileName) {
- this(DrillConfig.create(fileName));
+ this(DrillConfig.create(fileName), false);
}
public DrillClient(DrillConfig config) {
- this(config, null);
+ this(config, null, false);
+ }
+
+ public DrillClient(DrillConfig config, boolean isDirect) {
+ this(config, null, isDirect);
}
public DrillClient(DrillConfig config, ClusterCoordinator coordinator) {
- this(config, coordinator, null);
+ this(config, coordinator, null, false);
+ }
+
+ public DrillClient(DrillConfig config, ClusterCoordinator coordinator, boolean isDirect) {
+ this(config, coordinator, null, isDirect);
}
public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) {
+ this(config, coordinator, allocator, false);
+ }
+
+ public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator, boolean isDirect) {
+ // if isDirect is true, the client will connect directly to the drillbit instead of
+ // going thru the zookeeper
+ this.isDirectConnection = isDirect;
this.ownsZkConnection = coordinator == null;
this.ownsAllocator = allocator == null;
this.allocator = ownsAllocator ? new TopLevelAllocator(config) : allocator;
@@ -151,30 +171,39 @@ public class DrillClient implements Closeable, ConnectionThrottle {
return;
}
- if (ownsZkConnection) {
- try {
- this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect);
- this.clusterCoordinator.start(10000);
- } catch (Exception e) {
- throw new RpcException("Failure setting up ZK for client.", e);
+ final DrillbitEndpoint endpoint;
+ if (isDirectConnection) {
+ String[] connectInfo = props.getProperty("drillbit").split(":");
+ endpoint = DrillbitEndpoint.newBuilder()
+ .setAddress(connectInfo[0])
+ .setUserPort(Integer.parseInt(connectInfo[1]))
+ .build();
+ } else {
+ if (ownsZkConnection) {
+ try {
+ this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect);
+ this.clusterCoordinator.start(10000);
+ } catch (Exception e) {
+ throw new RpcException("Failure setting up ZK for client.", e);
+ }
}
- }
- if (props != null) {
- UserProperties.Builder upBuilder = UserProperties.newBuilder();
- for (String key : props.stringPropertyNames()) {
- upBuilder.addProperties(Property.newBuilder().setKey(key).setValue(props.getProperty(key)));
+ if (props != null) {
+ UserProperties.Builder upBuilder = UserProperties.newBuilder();
+ for (String key : props.stringPropertyNames()) {
+ upBuilder.addProperties(Property.newBuilder().setKey(key).setValue(props.getProperty(key)));
+ }
+
+ this.props = upBuilder.build();
}
- this.props = upBuilder.build();
+ ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getAvailableEndpoints());
+ checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
+ // shuffle the collection then get the first endpoint
+ Collections.shuffle(endpoints);
+ endpoint = endpoints.iterator().next();
}
- ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getAvailableEndpoints());
- checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
- // shuffle the collection then get the first endpoint
- Collections.shuffle(endpoints);
- DrillbitEndpoint endpoint = endpoints.iterator().next();
-
eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-");
client = new UserClient(supportComplexTypes, allocator, eventLoopGroup);
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
http://git-wip-us.apache.org/repos/asf/drill/blob/a6256b17/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
index de08cda..e353c71 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
@@ -35,6 +35,11 @@ public class DrillConnectionConfig extends ConnectionConfigImpl {
return "local".equals(props.getProperty("zk"));
}
+ // True if the URL points directly to a drillbit
+ public boolean isDirect(){
+ return props.getProperty("local")!=null;
+ }
+
// TODO: Check: Shouldn't something validate that URL has "zk" parameter?
public String getZookeeperConnectionString(){
return props.getProperty("zk");
http://git-wip-us.apache.org/repos/asf/drill/blob/a6256b17/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index 3fdbf84..2b18afb 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -110,6 +110,11 @@ public abstract class DrillConnectionImpl extends AvaticaConnection implements D
this.client = new DrillClient(dConfig, set.getCoordinator());
this.client.connect(null, info);
+ } else if(config.isDirect()) {
+ final DrillConfig dConfig = DrillConfig.forClient();
+ this.allocator = new TopLevelAllocator(dConfig);
+ this.client = new DrillClient(true); // Get a direct connection
+ this.client.connect(config.getZookeeperConnectionString(), info);
} else {
final DrillConfig dConfig = DrillConfig.forClient();
this.allocator = new TopLevelAllocator(dConfig);