You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/06/02 22:53:23 UTC

[1/3] drill git commit: DRILL-3034: Apply UserException to port-binding; handle in embedded-Drill case.

Repository: drill
Updated Branches:
  refs/heads/master bca206552 -> acf5566e8


DRILL-3034: Apply UserException to port-binding; handle in embedded-Drill case.

Applied UserException to can't-bind-to-port error.  [BasicServer]
Added specific handling of UserException (above case or other) in SQLException
wrapping.  [DrillConnectionImpl]


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/71199edd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/71199edd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/71199edd

Branch: refs/heads/master
Commit: 71199edd54e1b9c4ede58c99eb7b6721878b4449
Parents: bca2065
Author: dbarclay <db...@maprtech.com>
Authored: Mon May 11 20:55:19 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:25:48 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/exec/rpc/BasicServer.java   | 14 ++++++++++++--
 .../org/apache/drill/jdbc/DrillConnectionImpl.java    |  5 +++++
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/71199edd/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 5c04264..2ebd353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.net.BindException;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
@@ -190,15 +191,24 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
         b.bind(++port).sync();
         break;
       } catch (Exception e) {
+        // TODO(DRILL-3026):  Revisit:  Exception is not (always) BindException.
+        // One case is "java.io.IOException: bind() failed: Address already in
+        // use".
         if (e instanceof BindException && allowPortHunting) {
           continue;
         }
-        throw new DrillbitStartupException("Could not bind Drillbit", e);
+        final UserException bindException =
+            UserException
+              .resourceError( e )
+              .addContext( "Server type", getClass().getSimpleName() )
+              .message( "Drillbit could not bind to port %s.", port )
+              .build();
+        throw bindException;
       }
     }
 
     connect = !connect;
-    logger.debug("Server started on port {} of type {} ", port, this.getClass().getSimpleName());
+    logger.debug("Server of type {} started on port {}.", getClass().getSimpleName(), port);
     return port;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/71199edd/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index 7c6ef7e..5f82054 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -35,6 +35,7 @@ import net.hydromatic.avatica.UnregisteredDriver;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -95,6 +96,10 @@ public abstract class DrillConnectionImpl extends AvaticaConnection
           try {
             bit = new Drillbit(dConfig, serviceSet);
             bit.run();
+          } catch (final UserException e) {
+            throw new SQLException(
+                "Failure in starting embedded Drillbit: " + e.getMessage(),
+                e);
           } catch (Exception e) {
             // (Include cause exception's text in wrapping exception's text so
             // it's more likely to get to user (e.g., via SQLLine), and use


[3/3] drill git commit: DRILL-3159: Part 2--Core: Make JDBC throttling threshold configurable.

Posted by pa...@apache.org.
DRILL-3159: Part 2--Core: Make JDBC throttling threshold configurable.

Added configuration/option "drill.jdbc.batch_queue_throttling_threshold".
Applied "drill.jdbc.batch_queue_throttling_threshold" to DrillResultSetImpl.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/acf5566e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/acf5566e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/acf5566e

Branch: refs/heads/master
Commit: acf5566e81ca3fafe83309188499aabb2091b945
Parents: 0c69631
Author: dbarclay <db...@maprtech.com>
Authored: Thu May 14 15:36:14 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:26:11 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +++
 .../src/main/resources/drill-module.conf        |  4 ++++
 .../drill/jdbc/impl/DrillResultSetImpl.java     | 24 ++++++++++++++++----
 3 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be67f9d..91793f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -87,6 +87,9 @@ public interface ExecConstants {
   public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
   public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
   public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
+  /** Size of JDBC batch queue (in batches) above which throttling begins. */
+  public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
+      "drill.jdbc.batch_queue_throttling_threshold";
 
   /**
    * Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or

http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 66055f1..dbe449a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -163,3 +163,7 @@ drill.exec: {
     return_error_for_failure_in_cancelled_fragments: false
   }
 }
+
+drill.jdbc: {
+  batch_queue_throttling_threshold: 100
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 385ccf5..cb6bd1d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -31,6 +31,7 @@ import net.hydromatic.avatica.AvaticaResultSet;
 import net.hydromatic.avatica.AvaticaStatement;
 
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
@@ -63,7 +64,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   public SchemaChangeListener changeListener;
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
-  public final ResultsListener resultsListener = new ResultsListener();
+  public final ResultsListener resultsListener;
   private final DrillClient client;
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   // TODO:  Resolve:  Since is barely manipulated here in DrillResultSetImpl,
@@ -77,6 +78,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
                             ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
     super(statement, prepareResult, resultSetMetaData, timeZone);
     this.statement = statement;
+    final int batchQueueThrottlingThreshold =
+        this.getStatement().getConnection().getClient().getConfig().getInt(
+            ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
+    resultsListener = new ResultsListener( batchQueueThrottlingThreshold );
     DrillConnection c = (DrillConnection) statement.getConnection();
     DrillClient client = c.getClient();
     currentBatch = new RecordBatchLoader(client.getAllocator());
@@ -188,12 +193,13 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   public static class ResultsListener implements UserResultsListener {
     private static final Logger logger = getLogger( ResultsListener.class );
 
-    private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
     private static volatile int nextInstanceId = 1;
 
     /** (Just for logging.) */
     private final int instanceId;
 
+    private final int batchQueueThrottlingThreshold;
+
     /** (Just for logging.) */
     private volatile QueryId queryId;
 
@@ -225,8 +231,14 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
         Queues.newLinkedBlockingDeque();
 
 
-    ResultsListener() {
+    /**
+     * ...
+     * @param  batchQueueThrottlingThreshold
+     *         queue size threshold for throttling server
+     */
+    ResultsListener( int batchQueueThrottlingThreshold ) {
       instanceId = nextInstanceId++;
+      this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
       logger.debug( "[#{}] Query listener created.", instanceId );
     }
 
@@ -245,7 +257,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
     }
 
     /**
-     * Stops throttling if currently active.
+     * Stops throttling if currently throttling.
      * @return  true if actually stopped (was throttling)
      */
     private boolean stopThrottlingIfSo() {
@@ -300,7 +312,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
       // We're active; let's add to the queue.
       batchQueue.add(result);
-      if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
+
+      // Throttle server if queue size has exceed threshold.
+      if (batchQueue.size() > batchQueueThrottlingThreshold ) {
         if ( startThrottlingIfNot( throttle ) ) {
           logger.debug( "[#{}] Throttling started at queue size {}.",
                         instanceId, batchQueue.size() );


[2/3] drill git commit: DRILL-3159: Part 1--Prep., Hyg. for: Make JDBC throttling threshold configurable.

Posted by pa...@apache.org.
DRILL-3159: Part 1--Prep., Hyg. for: Make JDBC throttling threshold configurable.

Cleaned, enhanced DrillResultSet:
- Enhanced ResultsListener logging:
  - Added instance ID; added batch numbers.
  - Added logging at close (pairing with logging at construction).
  - Fixed 2-integer query ID to UUID form.
- Renamed qrb -> qdb; q -> qdb (per recent QueryDataBatch change).
- Added "final" on ResultsListener's logger.

Reduced Avatica-vs.-Drill casting:
- DrillStatementImpl's (Drill)Connection(Impl).
- DrillResultSetImpl's (Drill)Statement(Impl).

Converted a comment in ExecConstants.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0c69631f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0c69631f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0c69631f

Branch: refs/heads/master
Commit: 0c69631fb9e7ef19e503645d6433584a12de23d4
Parents: 71199ed
Author: dbarclay <db...@maprtech.com>
Authored: Wed May 20 18:16:23 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:26:02 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +-
 .../apache/drill/jdbc/DrillJdbc41Factory.java   | 11 ++-
 .../drill/jdbc/impl/DrillResultSetImpl.java     | 84 +++++++++++++-------
 .../drill/jdbc/impl/DrillStatementImpl.java     |  5 +-
 4 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 8a24e8d..be67f9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -60,7 +60,8 @@ public interface ExecConstants {
   public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
   public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
   public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl";
-  public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; // incoming buffer size (number of batches)
+  /** incoming buffer size (number of batches) */
+  public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size";
   public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete";
   public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
   public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";

http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
index 6240b62..93fe59d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
@@ -82,9 +82,14 @@ public class DrillJdbc41Factory extends DrillFactory {
   }
 
   @Override
-  public DrillResultSetImpl newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) {
-    final ResultSetMetaData metaData = newResultSetMetaData(statement, prepareResult.getColumnList());
-    return new DrillResultSetImpl(statement, (DrillPrepareResult) prepareResult, metaData, timeZone);
+  public DrillResultSetImpl newResultSet( AvaticaStatement statement,
+                                          AvaticaPrepareResult prepareResult,
+                                          TimeZone timeZone ) {
+    final ResultSetMetaData metaData =
+        newResultSetMetaData(statement, prepareResult.getColumnList());
+    return new DrillResultSetImpl( (DrillStatementImpl) statement,
+                                   (DrillPrepareResult) prepareResult,
+                                   metaData, timeZone);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 4fa1f2f..385ccf5 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -58,6 +58,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   @SuppressWarnings("unused")
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class);
 
+  private final DrillStatementImpl statement;
+
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   public SchemaChangeListener changeListener;
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
@@ -71,17 +73,21 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   public final DrillCursor cursor;
   public boolean hasPendingCancelationNotification;
 
-  public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult prepareResult,
+  public DrillResultSetImpl(DrillStatementImpl statement, AvaticaPrepareResult prepareResult,
                             ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
     super(statement, prepareResult, resultSetMetaData, timeZone);
+    this.statement = statement;
     DrillConnection c = (DrillConnection) statement.getConnection();
     DrillClient client = c.getClient();
-    // DrillClient client, DrillStatement statement) {
     currentBatch = new RecordBatchLoader(client.getAllocator());
     this.client = client;
     cursor = new DrillCursor(this);
   }
 
+  public DrillStatementImpl getStatement() {
+    return statement;
+  }
+
   /**
    * Throws AlreadyClosedSqlException or QueryCanceledSqlException if this
    * ResultSet is closed.
@@ -171,8 +177,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
   }
 
   public String getQueryId() {
-    if (resultsListener.queryId != null) {
-      return QueryIdHelper.getQueryId(resultsListener.queryId);
+    if (resultsListener.getQueryId() != null) {
+      return QueryIdHelper.getQueryId(resultsListener.getQueryId());
     } else {
       return null;
     }
@@ -180,12 +186,22 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   public static class ResultsListener implements UserResultsListener {
-    private static Logger logger = getLogger( ResultsListener.class );
+    private static final Logger logger = getLogger( ResultsListener.class );
 
     private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
+    private static volatile int nextInstanceId = 1;
 
+    /** (Just for logging.) */
+    private final int instanceId;
+
+    /** (Just for logging.) */
     private volatile QueryId queryId;
 
+    /** (Just for logging.) */
+    private int lastReceivedBatchNumber;
+    /** (Just for logging.) */
+    private int lastDequeuedBatchNumber;
+
     private volatile UserException executionFailureException;
 
     // TODO:  Revisit "completed".  Determine and document exactly what it
@@ -210,7 +226,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
 
     ResultsListener() {
-      logger.debug( "Query listener created." );
+      instanceId = nextInstanceId++;
+      logger.debug( "[#{}] Query listener created.", instanceId );
     }
 
     /**
@@ -252,22 +269,25 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
     @Override
     public void queryIdArrived(QueryId queryId) {
-      logger.debug( "Received query ID: {}.", queryId );
+      logger.debug( "[#{}] Received query ID: {}.",
+                    instanceId, QueryIdHelper.getQueryId( queryId ) );
       this.queryId = queryId;
     }
 
     @Override
     public void submissionFailed(UserException ex) {
-      logger.debug( "Received query failure:", ex );
+      logger.debug( "Received query failure:", instanceId, ex );
       this.executionFailureException = ex;
       completed = true;
       close();
-      logger.info( "Query failed: ", ex );
+      logger.info( "[#{}] Query failed: ", instanceId, ex );
     }
 
     @Override
     public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      logger.debug( "Received query data batch: {}.", result );
+      lastReceivedBatchNumber++;
+      logger.debug( "[#{}] Received query data batch #{}: {}.",
+                    instanceId, lastReceivedBatchNumber, result );
 
       // If we're in a closed state, just release the message.
       if (closed) {
@@ -282,7 +302,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
       batchQueue.add(result);
       if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
         if ( startThrottlingIfNot( throttle ) ) {
-          logger.debug( "Throttling started at queue size {}.", batchQueue.size() );
+          logger.debug( "[#{}] Throttling started at queue size {}.",
+                        instanceId, batchQueue.size() );
         }
       }
 
@@ -291,7 +312,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
     @Override
     public void queryCompleted(QueryState state) {
-      logger.debug( "Received query completion: {}.", state );
+      logger.debug( "[#{}] Received query completion: {}.", instanceId, state );
       releaseIfFirst();
       completed = true;
     }
@@ -313,41 +334,50 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
                                            InterruptedException {
       while (true) {
         if (executionFailureException != null) {
-          logger.debug( "Dequeued query failure exception: {}.", executionFailureException );
+          logger.debug( "[#{}] Dequeued query failure exception: {}.",
+                        instanceId, executionFailureException );
           throw executionFailureException;
         }
         if (completed && batchQueue.isEmpty()) {
           return null;
         } else {
-          QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS);
-          if (q != null) {
-            assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2;
-            if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
+          QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
+          if (qdb != null) {
+            lastDequeuedBatchNumber++;
+            logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
+                          instanceId, lastDequeuedBatchNumber, qdb );
+
+            // Unthrottle server if queue size has dropped enough below threshold:
+            if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
+                 || batchQueue.size() == 0  // (in case threshold < 2)
+                 ) {
               if ( stopThrottlingIfSo() ) {
-                logger.debug( "Throttling stopped at queue size {}.",
-                              batchQueue.size() );
+                logger.debug( "[#{}] Throttling stopped at queue size {}.",
+                              instanceId, batchQueue.size() );
               }
             }
-            logger.debug( "Dequeued query data batch: {}.", q );
-            return q;
+            return qdb;
           }
         }
       }
     }
 
     void close() {
+      logger.debug( "[#{}] Query listener closing.", instanceId );
       closed = true;
       if ( stopThrottlingIfSo() ) {
-        logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() );
+        logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
+                      instanceId, batchQueue.size() );
       }
       while (!batchQueue.isEmpty()) {
-        QueryDataBatch qrb = batchQueue.poll();
-        if (qrb != null && qrb.getData() != null) {
-          qrb.getData().release();
+        QueryDataBatch qdb = batchQueue.poll();
+        if (qdb != null && qdb.getData() != null) {
+          qdb.getData().release();
         }
       }
-      // close may be called before the first result is received and the main thread is blocked waiting
-      // for the result. In that case we want to unblock the main thread.
+      // Close may be called before the first result is received and therefore
+      // when the main thread is blocked waiting for the result.  In that case
+      // we want to unblock the main thread.
       latch.countDown(); // TODO:  Why not call releaseIfFirst as used elsewhere?
       completed = true;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
index 5160c31..6610f52 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
@@ -33,9 +33,12 @@ import net.hydromatic.avatica.AvaticaStatement;
 public abstract class DrillStatementImpl extends AvaticaStatement
    implements DrillStatement, DrillRemoteStatement {
 
+  private final DrillConnectionImpl connection;
+
   // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
   public DrillStatementImpl(DrillConnectionImpl connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
     super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
+    this.connection = connection;
     connection.openStatementsRegistry.addStatement(this);
   }
 
@@ -52,7 +55,7 @@ public abstract class DrillStatementImpl extends AvaticaStatement
 
   @Override
   public DrillConnectionImpl getConnection() {
-    return (DrillConnectionImpl) connection;
+    return connection;
   }
 
   // WORKAROUND:  Work around AvaticaStatement's code that wraps _any_ exception,