You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/06/02 22:53:25 UTC

[3/3] drill git commit: DRILL-3159: Part 2--Core: Make JDBC throttling threshold configurable.

DRILL-3159: Part 2--Core: Make JDBC throttling threshold configurable.

Added configuration/option "drill.jdbc.batch_queue_throttling_threshold".
Applied "drill.jdbc.batch_queue_throttling_threshold" to DrillResultSetImpl.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/acf5566e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/acf5566e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/acf5566e

Branch: refs/heads/master
Commit: acf5566e81ca3fafe83309188499aabb2091b945
Parents: 0c69631
Author: dbarclay <db...@maprtech.com>
Authored: Thu May 14 15:36:14 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:26:11 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +++
 .../src/main/resources/drill-module.conf        |  4 ++++
 .../drill/jdbc/impl/DrillResultSetImpl.java     | 24 ++++++++++++++++----
 3 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be67f9d..91793f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -87,6 +87,9 @@ public interface ExecConstants {
   public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
   public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
   public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
+  /** Size of JDBC batch queue (in batches) above which throttling begins. */
+  public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
+      "drill.jdbc.batch_queue_throttling_threshold";
 
   /**
    * Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or

http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 66055f1..dbe449a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -163,3 +163,7 @@ drill.exec: {
     return_error_for_failure_in_cancelled_fragments: false
   }
 }
+
+drill.jdbc: {
+  batch_queue_throttling_threshold: 100
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 385ccf5..cb6bd1d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -31,6 +31,7 @@ import net.hydromatic.avatica.AvaticaResultSet;
 import net.hydromatic.avatica.AvaticaStatement;
 
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
@@ -63,7 +64,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   public SchemaChangeListener changeListener;
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
-  public final ResultsListener resultsListener = new ResultsListener();
+  public final ResultsListener resultsListener;
   private final DrillClient client;
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   // TODO:  Resolve:  Since is barely manipulated here in DrillResultSetImpl,
@@ -77,6 +78,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
                             ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
     super(statement, prepareResult, resultSetMetaData, timeZone);
     this.statement = statement;
+    final int batchQueueThrottlingThreshold =
+        this.getStatement().getConnection().getClient().getConfig().getInt(
+            ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
+    resultsListener = new ResultsListener( batchQueueThrottlingThreshold );
     DrillConnection c = (DrillConnection) statement.getConnection();
     DrillClient client = c.getClient();
     currentBatch = new RecordBatchLoader(client.getAllocator());
@@ -188,12 +193,13 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   public static class ResultsListener implements UserResultsListener {
     private static final Logger logger = getLogger( ResultsListener.class );
 
-    private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
     private static volatile int nextInstanceId = 1;
 
     /** (Just for logging.) */
     private final int instanceId;
 
+    private final int batchQueueThrottlingThreshold;
+
     /** (Just for logging.) */
     private volatile QueryId queryId;
 
@@ -225,8 +231,14 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
         Queues.newLinkedBlockingDeque();
 
 
-    ResultsListener() {
+    /**
+     * ...
+     * @param  batchQueueThrottlingThreshold
+     *         queue size threshold for throttling server
+     */
+    ResultsListener( int batchQueueThrottlingThreshold ) {
       instanceId = nextInstanceId++;
+      this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
       logger.debug( "[#{}] Query listener created.", instanceId );
     }
 
@@ -245,7 +257,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
     }
 
     /**
-     * Stops throttling if currently active.
+     * Stops throttling if currently throttling.
      * @return  true if actually stopped (was throttling)
      */
     private boolean stopThrottlingIfSo() {
@@ -300,7 +312,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
       // We're active; let's add to the queue.
       batchQueue.add(result);
-      if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
+
+      // Throttle server if queue size has exceed threshold.
+      if (batchQueue.size() > batchQueueThrottlingThreshold ) {
         if ( startThrottlingIfNot( throttle ) ) {
           logger.debug( "[#{}] Throttling started at queue size {}.",
                         instanceId, batchQueue.size() );