You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/05/13 23:08:14 UTC
[2/2] drill git commit: DRILL-2993: Unthrottle at cancel() to fix
post-cancelation hangs.
DRILL-2993: Unthrottle at cancel() to fix post-cancelation hangs.
Added unthrottling in close().
Cleaned up throttling logic code:
- Applied AtomicBoolean to eliminate race conditions.
- Extracted methods for starting/stopping throttling.
Made small edits to some message:
- Fixed missed, inconsistent ResultsListener log messages.
- Clarified exception message.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/091122c4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/091122c4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/091122c4
Branch: refs/heads/master
Commit: 091122c4e651f1585118fcbf1a092627de03f0e4
Parents: ffbb9c7
Author: dbarclay <db...@maprtech.com>
Authored: Sat May 9 22:17:35 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Wed May 13 11:25:00 2015 -0700
----------------------------------------------------------------------
.../drill/jdbc/impl/DrillResultSetImpl.java | 70 ++++++++++++++++----
1 file changed, 57 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/091122c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 2fe6c28..4fa1f2f 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import net.hydromatic.avatica.AvaticaPrepareResult;
import net.hydromatic.avatica.AvaticaResultSet;
@@ -96,7 +97,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
if ( hasPendingCancelationNotification ) {
hasPendingCancelationNotification = false;
throw new ExecutionCanceledSqlException(
- "SQL statement execution canceled; resultSet closed." );
+ "SQL statement execution canceled; ResultSet now closed." );
}
else {
throw new AlreadyClosedSqlException( "ResultSet is already closed." );
@@ -186,9 +187,18 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
private volatile QueryId queryId;
private volatile UserException executionFailureException;
+
+ // TODO: Revisit "completed". Determine and document exactly what it
+ // means. Some uses imply that it means that incoming messages indicate
+ // that the _query_ has _terminated_ (not necessarily _completing_
+ // normally), while some uses imply that it's some other state of the
+ // ResultListener. Some uses seem redundant.)
volatile boolean completed = false;
- private volatile boolean autoread = true;
+
+ /** Whether throttling of incoming data is active. */
+ private final AtomicBoolean throttled = new AtomicBoolean( false );
private volatile ConnectionThrottle throttle;
+
private volatile boolean closed = false;
// TODO: Rename. It's obvious it's a latch--but what condition or action
// does it represent or control?
@@ -203,6 +213,33 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
logger.debug( "Query listener created." );
}
+ /**
+ * Starts throttling if not currently throttling.
+ * @param throttle the "throttlable" object to throttle
+ * @return true if actually started (wasn't throttling already)
+ */
+ private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
+ final boolean started = throttled.compareAndSet( false, true );
+ if ( started ) {
+ this.throttle = throttle;
+ throttle.setAutoRead(false);
+ }
+ return started;
+ }
+
+ /**
+ * Stops throttling if currently active.
+ * @return true if actually stopped (was throttling)
+ */
+ private boolean stopThrottlingIfSo() {
+ final boolean stopped = throttled.compareAndSet( true, false );
+ if ( stopped ) {
+ throttle.setAutoRead(true);
+ throttle = null;
+ }
+ return stopped;
+ }
+
// TODO: Doc.: Release what if what is first relative to what?
private boolean releaseIfFirst() {
if (receivedMessage.compareAndSet(false, true)) {
@@ -221,7 +258,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@Override
public void submissionFailed(UserException ex) {
- logger.debug( "Received query failure.", ex );
+ logger.debug( "Received query failure:", ex );
this.executionFailureException = ex;
completed = true;
close();
@@ -230,11 +267,13 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
- logger.debug( "Received query data: {}.", result );
+ logger.debug( "Received query data batch: {}.", result );
// If we're in a closed state, just release the message.
if (closed) {
result.release();
+ // TODO: Revisit member completed: Is ResultListener really completed
+ // after only one data batch after being closed?
completed = true;
return;
}
@@ -242,9 +281,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// We're active; let's add to the queue.
batchQueue.add(result);
if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
- throttle.setAutoRead(false);
- this.throttle = throttle;
- autoread = false;
+ if ( startThrottlingIfNot( throttle ) ) {
+ logger.debug( "Throttling started at queue size {}.", batchQueue.size() );
+ }
}
releaseIfFirst();
@@ -252,7 +291,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@Override
public void queryCompleted(QueryState state) {
- logger.debug( "Query completion arrived: {}.", state );
+ logger.debug( "Received query completion: {}.", state );
releaseIfFirst();
completed = true;
}
@@ -282,12 +321,14 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
} else {
QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS);
if (q != null) {
- if (!autoread && batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
- autoread = true;
- throttle.setAutoRead(true);
- throttle = null;
+ assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2;
+ if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
+ if ( stopThrottlingIfSo() ) {
+ logger.debug( "Throttling stopped at queue size {}.",
+ batchQueue.size() );
+ }
}
- logger.debug( "Dequeued query data: {}.", q );
+ logger.debug( "Dequeued query data batch: {}.", q );
return q;
}
}
@@ -296,6 +337,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
void close() {
closed = true;
+ if ( stopThrottlingIfSo() ) {
+ logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() );
+ }
while (!batchQueue.isEmpty()) {
QueryDataBatch qrb = batchQueue.poll();
if (qrb != null && qrb.getData() != null) {