You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/05/13 23:08:14 UTC

[2/2] drill git commit: DRILL-2993: Unthrottle at cancel() to fix post-cancelation hangs.

DRILL-2993: Unthrottle at cancel() to fix post-cancelation hangs.

Added unthrottling in close().
Cleaned up throttling logic code:
- Applied AtomicBoolean to eliminate race conditions.
- Extracted methods for starting/stopping throttling.

Made small edits to some message:
- Fixed missed, inconsistent ResultsListener log messages.
- Clarified exception message.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/091122c4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/091122c4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/091122c4

Branch: refs/heads/master
Commit: 091122c4e651f1585118fcbf1a092627de03f0e4
Parents: ffbb9c7
Author: dbarclay <db...@maprtech.com>
Authored: Sat May 9 22:17:35 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Wed May 13 11:25:00 2015 -0700

----------------------------------------------------------------------
 .../drill/jdbc/impl/DrillResultSetImpl.java     | 70 ++++++++++++++++----
 1 file changed, 57 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/091122c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 2fe6c28..4fa1f2f 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import net.hydromatic.avatica.AvaticaPrepareResult;
 import net.hydromatic.avatica.AvaticaResultSet;
@@ -96,7 +97,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
       if ( hasPendingCancelationNotification ) {
         hasPendingCancelationNotification = false;
         throw new ExecutionCanceledSqlException(
-            "SQL statement execution canceled; resultSet closed." );
+            "SQL statement execution canceled; ResultSet now closed." );
       }
       else {
         throw new AlreadyClosedSqlException( "ResultSet is already closed." );
@@ -186,9 +187,18 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
     private volatile QueryId queryId;
 
     private volatile UserException executionFailureException;
+
+    // TODO:  Revisit "completed".  Determine and document exactly what it
+    // means.  Some uses imply that it means that incoming messages indicate
+    // that the _query_ has _terminated_ (not necessarily _completing_
+    // normally), while some uses imply that it's some other state of the
+    // ResultListener.  Some uses seem redundant.)
     volatile boolean completed = false;
-    private volatile boolean autoread = true;
+
+    /** Whether throttling of incoming data is active. */
+    private final AtomicBoolean throttled = new AtomicBoolean( false );
     private volatile ConnectionThrottle throttle;
+
     private volatile boolean closed = false;
     // TODO:  Rename.  It's obvious it's a latch--but what condition or action
     // does it represent or control?
@@ -203,6 +213,33 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
       logger.debug( "Query listener created." );
     }
 
+    /**
+     * Starts throttling if not currently throttling.
+     * @param  throttle  the "throttlable" object to throttle
+     * @return  true if actually started (wasn't throttling already)
+     */
+    private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
+      final boolean started = throttled.compareAndSet( false, true );
+      if ( started ) {
+        this.throttle = throttle;
+        throttle.setAutoRead(false);
+      }
+      return started;
+    }
+
+    /**
+     * Stops throttling if currently active.
+     * @return  true if actually stopped (was throttling)
+     */
+    private boolean stopThrottlingIfSo() {
+      final boolean stopped = throttled.compareAndSet( true, false );
+      if ( stopped ) {
+        throttle.setAutoRead(true);
+        throttle = null;
+      }
+      return stopped;
+    }
+
     // TODO:  Doc.:  Release what if what is first relative to what?
     private boolean releaseIfFirst() {
       if (receivedMessage.compareAndSet(false, true)) {
@@ -221,7 +258,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
     @Override
     public void submissionFailed(UserException ex) {
-      logger.debug( "Received query failure.", ex );
+      logger.debug( "Received query failure:", ex );
       this.executionFailureException = ex;
       completed = true;
       close();
@@ -230,11 +267,13 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
     @Override
     public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      logger.debug( "Received query data: {}.", result );
+      logger.debug( "Received query data batch: {}.", result );
 
       // If we're in a closed state, just release the message.
       if (closed) {
         result.release();
+        // TODO:  Revisit member completed:  Is ResultListener really completed
+        // after only one data batch after being closed?
         completed = true;
         return;
       }
@@ -242,9 +281,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
       // We're active; let's add to the queue.
       batchQueue.add(result);
       if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
-        throttle.setAutoRead(false);
-        this.throttle = throttle;
-        autoread = false;
+        if ( startThrottlingIfNot( throttle ) ) {
+          logger.debug( "Throttling started at queue size {}.", batchQueue.size() );
+        }
       }
 
       releaseIfFirst();
@@ -252,7 +291,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
     @Override
     public void queryCompleted(QueryState state) {
-      logger.debug( "Query completion arrived: {}.", state );
+      logger.debug( "Received query completion: {}.", state );
       releaseIfFirst();
       completed = true;
     }
@@ -282,12 +321,14 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
         } else {
           QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS);
           if (q != null) {
-            if (!autoread && batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
-              autoread = true;
-              throttle.setAutoRead(true);
-              throttle = null;
+            assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2;
+            if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
+              if ( stopThrottlingIfSo() ) {
+                logger.debug( "Throttling stopped at queue size {}.",
+                              batchQueue.size() );
+              }
             }
-            logger.debug( "Dequeued query data: {}.", q );
+            logger.debug( "Dequeued query data batch: {}.", q );
             return q;
           }
         }
@@ -296,6 +337,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
 
     void close() {
       closed = true;
+      if ( stopThrottlingIfSo() ) {
+        logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() );
+      }
       while (!batchQueue.isEmpty()) {
         QueryDataBatch qrb = batchQueue.poll();
         if (qrb != null && qrb.getData() != null) {