You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/06/02 22:53:24 UTC

[2/3] drill git commit: DRILL-3159: Part 1--Prep., Hyg. for: Make JDBC throttling threshold configurable.

DRILL-3159: Part 1--Prep., Hyg. for: Make JDBC throttling threshold configurable.

Cleaned, enhanced DrillResultSet:
- Enhanced ResultsListener logging:
  - Added instance ID; added batch numbers.
  - Added logging at close (pairing with logging at construction).
  - Fixed 2-integer query ID to UUID form.
- Renamed qrb -> qdb; q -> qdb (per recent QueryDataBatch change).
- Added "final" on ResultsListener's logger.

Reduced Avatica-vs.-Drill casting:
- DrillStatementImpl's (Drill)Connection(Impl).
- DrillResultSetImpl's (Drill)Statement(Impl).

Converted a comment in ExecConstants.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0c69631f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0c69631f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0c69631f

Branch: refs/heads/master
Commit: 0c69631fb9e7ef19e503645d6433584a12de23d4
Parents: 71199ed
Author: dbarclay <db...@maprtech.com>
Authored: Wed May 20 18:16:23 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:26:02 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +-
 .../apache/drill/jdbc/DrillJdbc41Factory.java   | 11 ++-
 .../drill/jdbc/impl/DrillResultSetImpl.java     | 84 +++++++++++++-------
 .../drill/jdbc/impl/DrillStatementImpl.java     |  5 +-
 4 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 8a24e8d..be67f9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -60,7 +60,8 @@ public interface ExecConstants {
   public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
   public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
   public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl";
-  public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; // incoming buffer size (number of batches)
+  /** incoming buffer size (number of batches) */
+  public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size";
   public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete";
   public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
   public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";

http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
index 6240b62..93fe59d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
@@ -82,9 +82,14 @@ public class DrillJdbc41Factory extends DrillFactory {
   }
 
   @Override
-  public DrillResultSetImpl newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) {
-    final ResultSetMetaData metaData = newResultSetMetaData(statement, prepareResult.getColumnList());
-    return new DrillResultSetImpl(statement, (DrillPrepareResult) prepareResult, metaData, timeZone);
+  public DrillResultSetImpl newResultSet( AvaticaStatement statement,
+                                          AvaticaPrepareResult prepareResult,
+                                          TimeZone timeZone ) {
+    final ResultSetMetaData metaData =
+        newResultSetMetaData(statement, prepareResult.getColumnList());
+    return new DrillResultSetImpl( (DrillStatementImpl) statement,
+                                   (DrillPrepareResult) prepareResult,
+                                   metaData, timeZone);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 4fa1f2f..385ccf5 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -58,6 +58,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   @SuppressWarnings("unused")
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class);
 
+  private final DrillStatementImpl statement;
+
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   public SchemaChangeListener changeListener;
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
@@ -71,17 +73,21 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   public final DrillCursor cursor;
   public boolean hasPendingCancelationNotification;
 
-  public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult prepareResult,
+  public DrillResultSetImpl(DrillStatementImpl statement, AvaticaPrepareResult prepareResult,
                             ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
     super(statement, prepareResult, resultSetMetaData, timeZone);
+    this.statement = statement;
     DrillConnection c = (DrillConnection) statement.getConnection();
     DrillClient client = c.getClient();
-    // DrillClient client, DrillStatement statement) {
     currentBatch = new RecordBatchLoader(client.getAllocator());
     this.client = client;
     cursor = new DrillCursor(this);
   }
 
+  public DrillStatementImpl getStatement() {
+    return statement;
+  }
+
   /**
    * Throws AlreadyClosedSqlException or QueryCanceledSqlException if this
    * ResultSet is closed.
@@ -171,8 +177,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   }
 
   public String getQueryId() {
-    if (resultsListener.queryId != null) {
-      return QueryIdHelper.getQueryId(resultsListener.queryId);
+    if (resultsListener.getQueryId() != null) {
+      return QueryIdHelper.getQueryId(resultsListener.getQueryId());
     } else {
       return null;
     }
@@ -180,12 +186,22 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   public static class ResultsListener implements UserResultsListener {
-    private static Logger logger = getLogger( ResultsListener.class );
+    private static final Logger logger = getLogger( ResultsListener.class );
 
     private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
+    private static volatile int nextInstanceId = 1;
 
+    /** (Just for logging.) */
+    private final int instanceId;
+
+    /** (Just for logging.) */
     private volatile QueryId queryId;
 
+    /** (Just for logging.) */
+    private int lastReceivedBatchNumber;
+    /** (Just for logging.) */
+    private int lastDequeuedBatchNumber;
+
     private volatile UserException executionFailureException;
 
     // TODO:  Revisit "completed".  Determine and document exactly what it
@@ -210,7 +226,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
 
     ResultsListener() {
-      logger.debug( "Query listener created." );
+      instanceId = nextInstanceId++;
+      logger.debug( "[#{}] Query listener created.", instanceId );
     }
 
     /**
@@ -252,22 +269,25 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
     @Override
     public void queryIdArrived(QueryId queryId) {
-      logger.debug( "Received query ID: {}.", queryId );
+      logger.debug( "[#{}] Received query ID: {}.",
+                    instanceId, QueryIdHelper.getQueryId( queryId ) );
       this.queryId = queryId;
     }
 
     @Override
     public void submissionFailed(UserException ex) {
-      logger.debug( "Received query failure:", ex );
+      logger.debug( "Received query failure:", instanceId, ex );
       this.executionFailureException = ex;
       completed = true;
       close();
-      logger.info( "Query failed: ", ex );
+      logger.info( "[#{}] Query failed: ", instanceId, ex );
     }
 
     @Override
     public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      logger.debug( "Received query data batch: {}.", result );
+      lastReceivedBatchNumber++;
+      logger.debug( "[#{}] Received query data batch #{}: {}.",
+                    instanceId, lastReceivedBatchNumber, result );
 
       // If we're in a closed state, just release the message.
       if (closed) {
@@ -282,7 +302,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
       batchQueue.add(result);
       if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
         if ( startThrottlingIfNot( throttle ) ) {
-          logger.debug( "Throttling started at queue size {}.", batchQueue.size() );
+          logger.debug( "[#{}] Throttling started at queue size {}.",
+                        instanceId, batchQueue.size() );
         }
       }
 
@@ -291,7 +312,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
     @Override
     public void queryCompleted(QueryState state) {
-      logger.debug( "Received query completion: {}.", state );
+      logger.debug( "[#{}] Received query completion: {}.", instanceId, state );
       releaseIfFirst();
       completed = true;
     }
@@ -313,41 +334,50 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
                                            InterruptedException {
       while (true) {
         if (executionFailureException != null) {
-          logger.debug( "Dequeued query failure exception: {}.", executionFailureException );
+          logger.debug( "[#{}] Dequeued query failure exception: {}.",
+                        instanceId, executionFailureException );
           throw executionFailureException;
         }
         if (completed && batchQueue.isEmpty()) {
           return null;
         } else {
-          QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS);
-          if (q != null) {
-            assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2;
-            if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
+          QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
+          if (qdb != null) {
+            lastDequeuedBatchNumber++;
+            logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
+                          instanceId, lastDequeuedBatchNumber, qdb );
+
+            // Unthrottle server if queue size has dropped enough below threshold:
+            if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
+                 || batchQueue.size() == 0  // (in case threshold < 2)
+                 ) {
               if ( stopThrottlingIfSo() ) {
-                logger.debug( "Throttling stopped at queue size {}.",
-                              batchQueue.size() );
+                logger.debug( "[#{}] Throttling stopped at queue size {}.",
+                              instanceId, batchQueue.size() );
               }
             }
-            logger.debug( "Dequeued query data batch: {}.", q );
-            return q;
+            return qdb;
           }
         }
       }
     }
 
     void close() {
+      logger.debug( "[#{}] Query listener closing.", instanceId );
       closed = true;
       if ( stopThrottlingIfSo() ) {
-        logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() );
+        logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
+                      instanceId, batchQueue.size() );
       }
       while (!batchQueue.isEmpty()) {
-        QueryDataBatch qrb = batchQueue.poll();
-        if (qrb != null && qrb.getData() != null) {
-          qrb.getData().release();
+        QueryDataBatch qdb = batchQueue.poll();
+        if (qdb != null && qdb.getData() != null) {
+          qdb.getData().release();
         }
       }
-      // close may be called before the first result is received and the main thread is blocked waiting
-      // for the result. In that case we want to unblock the main thread.
+      // Close may be called before the first result is received and therefore
+      // when the main thread is blocked waiting for the result.  In that case
+      // we want to unblock the main thread.
       latch.countDown(); // TODO:  Why not call releaseIfFirst as used elsewhere?
       completed = true;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
index 5160c31..6610f52 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
@@ -33,9 +33,12 @@ import net.hydromatic.avatica.AvaticaStatement;
 public abstract class DrillStatementImpl extends AvaticaStatement
    implements DrillStatement, DrillRemoteStatement {
 
+  private final DrillConnectionImpl connection;
+
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   public DrillStatementImpl(DrillConnectionImpl connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
     super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
+    this.connection = connection;
     connection.openStatementsRegistry.addStatement(this);
   }
 
@@ -52,7 +55,7 @@ public abstract class DrillStatementImpl extends AvaticaStatement
 
   @Override
   public DrillConnectionImpl getConnection() {
-    return (DrillConnectionImpl) connection;
+    return connection;
   }
 
   // WORKAROUND:  Work around AvaticaStatement's code that wraps _any_ exception,