You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/06/02 22:53:24 UTC
[2/3] drill git commit: DRILL-3159: Part 1--Prep.,
Hyg. for: Make JDBC throttling threshold configurable.
DRILL-3159: Part 1--Prep., Hyg. for: Make JDBC throttling threshold configurable.
Cleaned, enhanced DrillResultSet:
- Enhanced ResultsListener logging:
- Added instance ID; added batch numbers.
- Added logging at close (pairing with logging at construction).
- Fixed 2-integer query ID to UUID form.
- Renamed qrb -> qdb; q -> qdb (per recent QueryDataBatch change).
- Added "final" on ResultsListener's logger.
Reduced Avatica-vs.-Drill casting:
- DrillStatementImpl's (Drill)Connection(Impl).
- DrillResultSetImpl's (Drill)Statement(Impl).
Converted a comment in ExecConstants.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0c69631f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0c69631f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0c69631f
Branch: refs/heads/master
Commit: 0c69631fb9e7ef19e503645d6433584a12de23d4
Parents: 71199ed
Author: dbarclay <db...@maprtech.com>
Authored: Wed May 20 18:16:23 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:26:02 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 3 +-
.../apache/drill/jdbc/DrillJdbc41Factory.java | 11 ++-
.../drill/jdbc/impl/DrillResultSetImpl.java | 84 +++++++++++++-------
.../drill/jdbc/impl/DrillStatementImpl.java | 5 +-
4 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 8a24e8d..be67f9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -60,7 +60,8 @@ public interface ExecConstants {
public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl";
- public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; // incoming buffer size (number of batches)
+ /** incoming buffer size (number of batches) */
+ public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size";
public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete";
public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
index 6240b62..93fe59d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java
@@ -82,9 +82,14 @@ public class DrillJdbc41Factory extends DrillFactory {
}
@Override
- public DrillResultSetImpl newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) {
- final ResultSetMetaData metaData = newResultSetMetaData(statement, prepareResult.getColumnList());
- return new DrillResultSetImpl(statement, (DrillPrepareResult) prepareResult, metaData, timeZone);
+ public DrillResultSetImpl newResultSet( AvaticaStatement statement,
+ AvaticaPrepareResult prepareResult,
+ TimeZone timeZone ) {
+ final ResultSetMetaData metaData =
+ newResultSetMetaData(statement, prepareResult.getColumnList());
+ return new DrillResultSetImpl( (DrillStatementImpl) statement,
+ (DrillPrepareResult) prepareResult,
+ metaData, timeZone);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 4fa1f2f..385ccf5 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -58,6 +58,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@SuppressWarnings("unused")
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class);
+ private final DrillStatementImpl statement;
+
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public SchemaChangeListener changeListener;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
@@ -71,17 +73,21 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
public final DrillCursor cursor;
public boolean hasPendingCancelationNotification;
- public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult prepareResult,
+ public DrillResultSetImpl(DrillStatementImpl statement, AvaticaPrepareResult prepareResult,
ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
super(statement, prepareResult, resultSetMetaData, timeZone);
+ this.statement = statement;
DrillConnection c = (DrillConnection) statement.getConnection();
DrillClient client = c.getClient();
- // DrillClient client, DrillStatement statement) {
currentBatch = new RecordBatchLoader(client.getAllocator());
this.client = client;
cursor = new DrillCursor(this);
}
+ public DrillStatementImpl getStatement() {
+ return statement;
+ }
+
/**
* Throws AlreadyClosedSqlException or QueryCanceledSqlException if this
* ResultSet is closed.
@@ -171,8 +177,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
}
public String getQueryId() {
- if (resultsListener.queryId != null) {
- return QueryIdHelper.getQueryId(resultsListener.queryId);
+ if (resultsListener.getQueryId() != null) {
+ return QueryIdHelper.getQueryId(resultsListener.getQueryId());
} else {
return null;
}
@@ -180,12 +186,22 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public static class ResultsListener implements UserResultsListener {
- private static Logger logger = getLogger( ResultsListener.class );
+ private static final Logger logger = getLogger( ResultsListener.class );
private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
+ private static volatile int nextInstanceId = 1;
+ /** (Just for logging.) */
+ private final int instanceId;
+
+ /** (Just for logging.) */
private volatile QueryId queryId;
+ /** (Just for logging.) */
+ private int lastReceivedBatchNumber;
+ /** (Just for logging.) */
+ private int lastDequeuedBatchNumber;
+
private volatile UserException executionFailureException;
// TODO: Revisit "completed". Determine and document exactly what it
@@ -210,7 +226,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
ResultsListener() {
- logger.debug( "Query listener created." );
+ instanceId = nextInstanceId++;
+ logger.debug( "[#{}] Query listener created.", instanceId );
}
/**
@@ -252,22 +269,25 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@Override
public void queryIdArrived(QueryId queryId) {
- logger.debug( "Received query ID: {}.", queryId );
+ logger.debug( "[#{}] Received query ID: {}.",
+ instanceId, QueryIdHelper.getQueryId( queryId ) );
this.queryId = queryId;
}
@Override
public void submissionFailed(UserException ex) {
- logger.debug( "Received query failure:", ex );
+ logger.debug( "Received query failure:", instanceId, ex );
this.executionFailureException = ex;
completed = true;
close();
- logger.info( "Query failed: ", ex );
+ logger.info( "[#{}] Query failed: ", instanceId, ex );
}
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
- logger.debug( "Received query data batch: {}.", result );
+ lastReceivedBatchNumber++;
+ logger.debug( "[#{}] Received query data batch #{}: {}.",
+ instanceId, lastReceivedBatchNumber, result );
// If we're in a closed state, just release the message.
if (closed) {
@@ -282,7 +302,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
batchQueue.add(result);
if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
if ( startThrottlingIfNot( throttle ) ) {
- logger.debug( "Throttling started at queue size {}.", batchQueue.size() );
+ logger.debug( "[#{}] Throttling started at queue size {}.",
+ instanceId, batchQueue.size() );
}
}
@@ -291,7 +312,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@Override
public void queryCompleted(QueryState state) {
- logger.debug( "Received query completion: {}.", state );
+ logger.debug( "[#{}] Received query completion: {}.", instanceId, state );
releaseIfFirst();
completed = true;
}
@@ -313,41 +334,50 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
InterruptedException {
while (true) {
if (executionFailureException != null) {
- logger.debug( "Dequeued query failure exception: {}.", executionFailureException );
+ logger.debug( "[#{}] Dequeued query failure exception: {}.",
+ instanceId, executionFailureException );
throw executionFailureException;
}
if (completed && batchQueue.isEmpty()) {
return null;
} else {
- QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS);
- if (q != null) {
- assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2;
- if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
+ QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
+ if (qdb != null) {
+ lastDequeuedBatchNumber++;
+ logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
+ instanceId, lastDequeuedBatchNumber, qdb );
+
+ // Unthrottle server if queue size has dropped enough below threshold:
+ if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
+ || batchQueue.size() == 0 // (in case threshold < 2)
+ ) {
if ( stopThrottlingIfSo() ) {
- logger.debug( "Throttling stopped at queue size {}.",
- batchQueue.size() );
+ logger.debug( "[#{}] Throttling stopped at queue size {}.",
+ instanceId, batchQueue.size() );
}
}
- logger.debug( "Dequeued query data batch: {}.", q );
- return q;
+ return qdb;
}
}
}
}
void close() {
+ logger.debug( "[#{}] Query listener closing.", instanceId );
closed = true;
if ( stopThrottlingIfSo() ) {
- logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() );
+ logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
+ instanceId, batchQueue.size() );
}
while (!batchQueue.isEmpty()) {
- QueryDataBatch qrb = batchQueue.poll();
- if (qrb != null && qrb.getData() != null) {
- qrb.getData().release();
+ QueryDataBatch qdb = batchQueue.poll();
+ if (qdb != null && qdb.getData() != null) {
+ qdb.getData().release();
}
}
- // close may be called before the first result is received and the main thread is blocked waiting
- // for the result. In that case we want to unblock the main thread.
+ // Close may be called before the first result is received and therefore
+ // when the main thread is blocked waiting for the result. In that case
+ // we want to unblock the main thread.
latch.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere?
completed = true;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
index 5160c31..6610f52 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
@@ -33,9 +33,12 @@ import net.hydromatic.avatica.AvaticaStatement;
public abstract class DrillStatementImpl extends AvaticaStatement
implements DrillStatement, DrillRemoteStatement {
+ private final DrillConnectionImpl connection;
+
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public DrillStatementImpl(DrillConnectionImpl connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
+ this.connection = connection;
connection.openStatementsRegistry.addStatement(this);
}
@@ -52,7 +55,7 @@ public abstract class DrillStatementImpl extends AvaticaStatement
@Override
public DrillConnectionImpl getConnection() {
- return (DrillConnectionImpl) connection;
+ return connection;
}
// WORKAROUND: Work around AvaticaStatement's code that wraps _any_ exception,