You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/06/02 22:53:23 UTC
[1/3] drill git commit: DRILL-3034: Apply UserException to
port-binding; handle in embedded-Drill case.
Repository: drill
Updated Branches:
refs/heads/master bca206552 -> acf5566e8
DRILL-3034: Apply UserException to port-binding; handle in embedded-Drill case.
Applied UserException to can't-bind-to-port error. [BasicServer]
Added specific handling of UserException (above case or other) in SQLException
wrapping. [DrillConnectionImpl]
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/71199edd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/71199edd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/71199edd
Branch: refs/heads/master
Commit: 71199edd54e1b9c4ede58c99eb7b6721878b4449
Parents: bca2065
Author: dbarclay <db...@maprtech.com>
Authored: Mon May 11 20:55:19 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:25:48 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/drill/exec/rpc/BasicServer.java | 14 ++++++++++++--
.../org/apache/drill/jdbc/DrillConnectionImpl.java | 5 +++++
2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/71199edd/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 5c04264..2ebd353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.net.BindException;
import java.util.concurrent.ExecutionException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
@@ -190,15 +191,24 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
b.bind(++port).sync();
break;
} catch (Exception e) {
+ // TODO(DRILL-3026): Revisit: Exception is not (always) BindException.
+ // One case is "java.io.IOException: bind() failed: Address already in
+ // use".
if (e instanceof BindException && allowPortHunting) {
continue;
}
- throw new DrillbitStartupException("Could not bind Drillbit", e);
+ final UserException bindException =
+ UserException
+ .resourceError( e )
+ .addContext( "Server type", getClass().getSimpleName() )
+ .message( "Drillbit could not bind to port %s.", port )
+ .build();
+ throw bindException;
}
}
connect = !connect;
- logger.debug("Server started on port {} of type {} ", port, this.getClass().getSimpleName());
+ logger.debug("Server of type {} started on port {}.", getClass().getSimpleName(), port);
return port;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/71199edd/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index 7c6ef7e..5f82054 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -35,6 +35,7 @@ import net.hydromatic.avatica.UnregisteredDriver;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -95,6 +96,10 @@ public abstract class DrillConnectionImpl extends AvaticaConnection
try {
bit = new Drillbit(dConfig, serviceSet);
bit.run();
+ } catch (final UserException e) {
+ throw new SQLException(
+ "Failure in starting embedded Drillbit: " + e.getMessage(),
+ e);
} catch (Exception e) {
// (Include cause exception's text in wrapping exception's text so
// it's more likely to get to user (e.g., via SQLLine), and use
[3/3] drill git commit: DRILL-3159: Part 2--Core: Make JDBC
throttling threshold configurable.
Posted by pa...@apache.org.
DRILL-3159: Part 2--Core: Make JDBC throttling threshold configurable.
Added configuration/option "drill.jdbc.batch_queue_throttling_threshold".
Applied "drill.jdbc.batch_queue_throttling_threshold" to DrillResultSetImpl.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/acf5566e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/acf5566e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/acf5566e
Branch: refs/heads/master
Commit: acf5566e81ca3fafe83309188499aabb2091b945
Parents: 0c69631
Author: dbarclay <db...@maprtech.com>
Authored: Thu May 14 15:36:14 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:26:11 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 3 +++
.../src/main/resources/drill-module.conf | 4 ++++
.../drill/jdbc/impl/DrillResultSetImpl.java | 24 ++++++++++++++++----
3 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be67f9d..91793f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -87,6 +87,9 @@ public interface ExecConstants {
public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
+ /** Size of JDBC batch queue (in batches) above which throttling begins. */
+ public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
+ "drill.jdbc.batch_queue_throttling_threshold";
/**
* Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 66055f1..dbe449a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -163,3 +163,7 @@ drill.exec: {
return_error_for_failure_in_cancelled_fragments: false
}
}
+
+drill.jdbc: {
+ batch_queue_throttling_threshold: 100
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 385ccf5..cb6bd1d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -31,6 +31,7 @@ import net.hydromatic.avatica.AvaticaResultSet;
import net.hydromatic.avatica.AvaticaStatement;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
@@ -63,7 +64,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public SchemaChangeListener changeListener;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
- public final ResultsListener resultsListener = new ResultsListener();
+ public final ResultsListener resultsListener;
private final DrillClient client;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
// TODO: Resolve: Since is barely manipulated here in DrillResultSetImpl,
@@ -77,6 +78,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
super(statement, prepareResult, resultSetMetaData, timeZone);
this.statement = statement;
+ final int batchQueueThrottlingThreshold =
+ this.getStatement().getConnection().getClient().getConfig().getInt(
+ ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
+ resultsListener = new ResultsListener( batchQueueThrottlingThreshold );
DrillConnection c = (DrillConnection) statement.getConnection();
DrillClient client = c.getClient();
currentBatch = new RecordBatchLoader(client.getAllocator());
@@ -188,12 +193,13 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
public static class ResultsListener implements UserResultsListener {
private static final Logger logger = getLogger( ResultsListener.class );
- private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
private static volatile int nextInstanceId = 1;
/** (Just for logging.) */
private final int instanceId;
+ private final int batchQueueThrottlingThreshold;
+
/** (Just for logging.) */
private volatile QueryId queryId;
@@ -225,8 +231,14 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
Queues.newLinkedBlockingDeque();
- ResultsListener() {
+ /**
+ * ...
+ * @param batchQueueThrottlingThreshold
+ * queue size threshold for throttling server
+ */
+ ResultsListener( int batchQueueThrottlingThreshold ) {
instanceId = nextInstanceId++;
+ this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
logger.debug( "[#{}] Query listener created.", instanceId );
}
@@ -245,7 +257,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
}
/**
- * Stops throttling if currently active.
+ * Stops throttling if currently throttling.
* @return true if actually stopped (was throttling)
*/
private boolean stopThrottlingIfSo() {
@@ -300,7 +312,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// We're active; let's add to the queue.
batchQueue.add(result);
- if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
+
+ // Throttle server if queue size has exceed threshold.
+ if (batchQueue.size() > batchQueueThrottlingThreshold ) {
if ( startThrottlingIfNot( throttle ) ) {
logger.debug( "[#{}] Throttling started at queue size {}.",
instanceId, batchQueue.size() );
[2/3] drill git commit: DRILL-3159: Part 1--Prep.,
Hyg. for: Make JDBC throttling threshold configurable.
Posted by pa...@apache.org.
DRILL-3159: Part 1--Prep., Hyg. for: Make JDBC throttling threshold configurable.
Cleaned, enhanced DrillResultSet:
- Enhanced ResultsListener logging:
- Added instance ID; added batch numbers.
- Added logging at close (pairing with logging at construction).
- Fixed 2-integer query ID to UUID form.
- Renamed qrb -> qdb; q -> qdb (per recent QueryDataBatch change).
- Added "final" on ResultsListener's logger.
Reduced Avatica-vs.-Drill casting:
- DrillStatementImpl's (Drill)Connection(Impl).
- DrillResultSetImpl's (Drill)Statement(Impl).
Converted a comment in ExecConstants.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0c69631f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0c69631f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0c69631f
Branch: refs/heads/master
Commit: 0c69631fb9e7ef19e503645d6433584a12de23d4
Parents: 71199ed
Author: dbarclay <db...@maprtech.com>
Authored: Wed May 20 18:16:23 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:26:02 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 3 +-
.../apache/drill/jdbc/DrillJdbc41Factory.java | 11 ++-
.../drill/jdbc/impl/DrillResultSetImpl.java | 84 +++++++++++++-------
.../drill/jdbc/impl/DrillStatementImpl.java | 5 +-
4 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 8a24e8d..be67f9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -60,7 +60,8 @@ public interface ExecConstants {
public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl";
- public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; // incoming buffer size (number of batches)
+ /** incoming buffer size (number of batches) */
+ public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size";
public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete";
public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
index 6240b62..93fe59d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
@@ -82,9 +82,14 @@ public class DrillJdbc41Factory extends DrillFactory {
}
@Override
- public DrillResultSetImpl newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) {
- final ResultSetMetaData metaData = newResultSetMetaData(statement, prepareResult.getColumnList());
- return new DrillResultSetImpl(statement, (DrillPrepareResult) prepareResult, metaData, timeZone);
+ public DrillResultSetImpl newResultSet( AvaticaStatement statement,
+ AvaticaPrepareResult prepareResult,
+ TimeZone timeZone ) {
+ final ResultSetMetaData metaData =
+ newResultSetMetaData(statement, prepareResult.getColumnList());
+ return new DrillResultSetImpl( (DrillStatementImpl) statement,
+ (DrillPrepareResult) prepareResult,
+ metaData, timeZone);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 4fa1f2f..385ccf5 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -58,6 +58,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@SuppressWarnings("unused")
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class);
+ private final DrillStatementImpl statement;
+
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public SchemaChangeListener changeListener;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
@@ -71,17 +73,21 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
public final DrillCursor cursor;
public boolean hasPendingCancelationNotification;
- public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult prepareResult,
+ public DrillResultSetImpl(DrillStatementImpl statement, AvaticaPrepareResult prepareResult,
ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
super(statement, prepareResult, resultSetMetaData, timeZone);
+ this.statement = statement;
DrillConnection c = (DrillConnection) statement.getConnection();
DrillClient client = c.getClient();
- // DrillClient client, DrillStatement statement) {
currentBatch = new RecordBatchLoader(client.getAllocator());
this.client = client;
cursor = new DrillCursor(this);
}
+ public DrillStatementImpl getStatement() {
+ return statement;
+ }
+
/**
* Throws AlreadyClosedSqlException or QueryCanceledSqlException if this
* ResultSet is closed.
@@ -171,8 +177,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
}
public String getQueryId() {
- if (resultsListener.queryId != null) {
- return QueryIdHelper.getQueryId(resultsListener.queryId);
+ if (resultsListener.getQueryId() != null) {
+ return QueryIdHelper.getQueryId(resultsListener.getQueryId());
} else {
return null;
}
@@ -180,12 +186,22 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public static class ResultsListener implements UserResultsListener {
- private static Logger logger = getLogger( ResultsListener.class );
+ private static final Logger logger = getLogger( ResultsListener.class );
private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
+ private static volatile int nextInstanceId = 1;
+ /** (Just for logging.) */
+ private final int instanceId;
+
+ /** (Just for logging.) */
private volatile QueryId queryId;
+ /** (Just for logging.) */
+ private int lastReceivedBatchNumber;
+ /** (Just for logging.) */
+ private int lastDequeuedBatchNumber;
+
private volatile UserException executionFailureException;
// TODO: Revisit "completed". Determine and document exactly what it
@@ -210,7 +226,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
ResultsListener() {
- logger.debug( "Query listener created." );
+ instanceId = nextInstanceId++;
+ logger.debug( "[#{}] Query listener created.", instanceId );
}
/**
@@ -252,22 +269,25 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@Override
public void queryIdArrived(QueryId queryId) {
- logger.debug( "Received query ID: {}.", queryId );
+ logger.debug( "[#{}] Received query ID: {}.",
+ instanceId, QueryIdHelper.getQueryId( queryId ) );
this.queryId = queryId;
}
@Override
public void submissionFailed(UserException ex) {
- logger.debug( "Received query failure:", ex );
+ logger.debug( "Received query failure:", instanceId, ex );
this.executionFailureException = ex;
completed = true;
close();
- logger.info( "Query failed: ", ex );
+ logger.info( "[#{}] Query failed: ", instanceId, ex );
}
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
- logger.debug( "Received query data batch: {}.", result );
+ lastReceivedBatchNumber++;
+ logger.debug( "[#{}] Received query data batch #{}: {}.",
+ instanceId, lastReceivedBatchNumber, result );
// If we're in a closed state, just release the message.
if (closed) {
@@ -282,7 +302,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
batchQueue.add(result);
if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
if ( startThrottlingIfNot( throttle ) ) {
- logger.debug( "Throttling started at queue size {}.", batchQueue.size() );
+ logger.debug( "[#{}] Throttling started at queue size {}.",
+ instanceId, batchQueue.size() );
}
}
@@ -291,7 +312,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@Override
public void queryCompleted(QueryState state) {
- logger.debug( "Received query completion: {}.", state );
+ logger.debug( "[#{}] Received query completion: {}.", instanceId, state );
releaseIfFirst();
completed = true;
}
@@ -313,41 +334,50 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
InterruptedException {
while (true) {
if (executionFailureException != null) {
- logger.debug( "Dequeued query failure exception: {}.", executionFailureException );
+ logger.debug( "[#{}] Dequeued query failure exception: {}.",
+ instanceId, executionFailureException );
throw executionFailureException;
}
if (completed && batchQueue.isEmpty()) {
return null;
} else {
- QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS);
- if (q != null) {
- assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2;
- if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
+ QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
+ if (qdb != null) {
+ lastDequeuedBatchNumber++;
+ logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
+ instanceId, lastDequeuedBatchNumber, qdb );
+
+ // Unthrottle server if queue size has dropped enough below threshold:
+ if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
+ || batchQueue.size() == 0 // (in case threshold < 2)
+ ) {
if ( stopThrottlingIfSo() ) {
- logger.debug( "Throttling stopped at queue size {}.",
- batchQueue.size() );
+ logger.debug( "[#{}] Throttling stopped at queue size {}.",
+ instanceId, batchQueue.size() );
}
}
- logger.debug( "Dequeued query data batch: {}.", q );
- return q;
+ return qdb;
}
}
}
}
void close() {
+ logger.debug( "[#{}] Query listener closing.", instanceId );
closed = true;
if ( stopThrottlingIfSo() ) {
- logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() );
+ logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
+ instanceId, batchQueue.size() );
}
while (!batchQueue.isEmpty()) {
- QueryDataBatch qrb = batchQueue.poll();
- if (qrb != null && qrb.getData() != null) {
- qrb.getData().release();
+ QueryDataBatch qdb = batchQueue.poll();
+ if (qdb != null && qdb.getData() != null) {
+ qdb.getData().release();
}
}
- // close may be called before the first result is received and the main thread is blocked waiting
- // for the result. In that case we want to unblock the main thread.
+ // Close may be called before the first result is received and therefore
+ // when the main thread is blocked waiting for the result. In that case
+ // we want to unblock the main thread.
latch.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere?
completed = true;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
index 5160c31..6610f52 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
@@ -33,9 +33,12 @@ import net.hydromatic.avatica.AvaticaStatement;
public abstract class DrillStatementImpl extends AvaticaStatement
implements DrillStatement, DrillRemoteStatement {
+ private final DrillConnectionImpl connection;
+
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public DrillStatementImpl(DrillConnectionImpl connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
+ this.connection = connection;
connection.openStatementsRegistry.addStatement(this);
}
@@ -52,7 +55,7 @@ public abstract class DrillStatementImpl extends AvaticaStatement
@Override
public DrillConnectionImpl getConnection() {
- return (DrillConnectionImpl) connection;
+ return connection;
}
// WORKAROUND: Work around AvaticaStatement's code that wraps _any_ exception,