You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/25 01:22:37 UTC

[02/11] drill git commit: DRILL-2811: Allow direct connection to drillbit from DrillClient

DRILL-2811: Allow direct connection to drillbit from DrillClient


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a6256b17
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a6256b17
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a6256b17

Branch: refs/heads/master
Commit: a6256b175e1a75fbc323babac29c52401d975842
Parents: 25ea3e8
Author: Parth Chandra <pc...@maprtech.com>
Authored: Tue Apr 21 22:18:14 2015 -0700
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Fri Apr 24 16:21:54 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/client/DrillClient.java   | 73 ++++++++++++++------
 .../drill/jdbc/DrillConnectionConfig.java       |  5 ++
 .../apache/drill/jdbc/DrillConnectionImpl.java  |  5 ++
 3 files changed, 61 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a6256b17/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 0d29f60..ae0f580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -82,25 +82,45 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   private boolean supportComplexTypes;
   private final boolean ownsZkConnection;
   private final boolean ownsAllocator;
+  private final boolean isDirectConnection; // true if the connection bypasses zookeeper and connects directly to a drillbit
   private EventLoopGroup eventLoopGroup;
 
   public DrillClient() {
-    this(DrillConfig.create());
+    this(DrillConfig.create(), false);
+  }
+
+  public DrillClient(boolean isDirect) {
+    this(DrillConfig.create(), isDirect);
   }
 
   public DrillClient(String fileName) {
-    this(DrillConfig.create(fileName));
+    this(DrillConfig.create(fileName), false);
   }
 
   public DrillClient(DrillConfig config) {
-    this(config, null);
+    this(config, null, false);
+  }
+
+  public DrillClient(DrillConfig config, boolean isDirect) {
+    this(config, null, isDirect);
   }
 
   public DrillClient(DrillConfig config, ClusterCoordinator coordinator) {
-    this(config, coordinator, null);
+    this(config, coordinator, null, false);
+  }
+
+  public DrillClient(DrillConfig config, ClusterCoordinator coordinator, boolean isDirect) {
+    this(config, coordinator, null, isDirect);
   }
 
   public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) {
+    this(config, coordinator, allocator, false);
+  }
+
+  public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator, boolean isDirect) {
+    // if isDirect is true, the client will connect directly to the drillbit instead of
+    // going thru the zookeeper
+    this.isDirectConnection = isDirect;
     this.ownsZkConnection = coordinator == null;
     this.ownsAllocator = allocator == null;
     this.allocator = ownsAllocator ? new TopLevelAllocator(config) : allocator;
@@ -151,30 +171,39 @@ public class DrillClient implements Closeable, ConnectionThrottle {
       return;
     }
 
-    if (ownsZkConnection) {
-      try {
-        this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect);
-        this.clusterCoordinator.start(10000);
-      } catch (Exception e) {
-        throw new RpcException("Failure setting up ZK for client.", e);
+    final DrillbitEndpoint endpoint;
+    if (isDirectConnection) {
+      String[] connectInfo = props.getProperty("drillbit").split(":");
+      endpoint = DrillbitEndpoint.newBuilder()
+              .setAddress(connectInfo[0])
+              .setUserPort(Integer.parseInt(connectInfo[1]))
+              .build();
+    } else {
+      if (ownsZkConnection) {
+        try {
+          this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect);
+          this.clusterCoordinator.start(10000);
+        } catch (Exception e) {
+          throw new RpcException("Failure setting up ZK for client.", e);
+        }
       }
-    }
 
-    if (props != null) {
-      UserProperties.Builder upBuilder = UserProperties.newBuilder();
-      for (String key : props.stringPropertyNames()) {
-        upBuilder.addProperties(Property.newBuilder().setKey(key).setValue(props.getProperty(key)));
+      if (props != null) {
+        UserProperties.Builder upBuilder = UserProperties.newBuilder();
+        for (String key : props.stringPropertyNames()) {
+          upBuilder.addProperties(Property.newBuilder().setKey(key).setValue(props.getProperty(key)));
+        }
+
+        this.props = upBuilder.build();
       }
 
-      this.props = upBuilder.build();
+      ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getAvailableEndpoints());
+      checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
+      // shuffle the collection then get the first endpoint
+      Collections.shuffle(endpoints);
+      endpoint = endpoints.iterator().next();
     }
 
-    ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getAvailableEndpoints());
-    checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
-    // shuffle the collection then get the first endpoint
-    Collections.shuffle(endpoints);
-    DrillbitEndpoint endpoint = endpoints.iterator().next();
-
     eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-");
     client = new UserClient(supportComplexTypes, allocator, eventLoopGroup);
     logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());

http://git-wip-us.apache.org/repos/asf/drill/blob/a6256b17/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
index de08cda..e353c71 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
@@ -35,6 +35,11 @@ public class DrillConnectionConfig extends ConnectionConfigImpl {
     return "local".equals(props.getProperty("zk"));
   }
 
+  // True if the URL points directly to a drillbit
+  public boolean isDirect(){
+    return props.getProperty("local")!=null;
+  }
+
   // TODO: Check: Shouldn't something validate that URL has "zk" parameter?
   public String getZookeeperConnectionString(){
     return props.getProperty("zk");

http://git-wip-us.apache.org/repos/asf/drill/blob/a6256b17/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index 3fdbf84..2b18afb 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -110,6 +110,11 @@ public abstract class DrillConnectionImpl extends AvaticaConnection implements D
 
         this.client = new DrillClient(dConfig, set.getCoordinator());
         this.client.connect(null, info);
+      } else if(config.isDirect()) {
+        final DrillConfig dConfig = DrillConfig.forClient();
+        this.allocator = new TopLevelAllocator(dConfig);
+        this.client = new DrillClient(true); // Get a direct connection
+        this.client.connect(config.getZookeeperConnectionString(), info);
       } else {
         final DrillConfig dConfig = DrillConfig.forClient();
         this.allocator = new TopLevelAllocator(dConfig);