You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/06/02 22:53:25 UTC
[3/3] drill git commit: DRILL-3159: Part 2--Core: Make JDBC
throttling threshold configurable.
DRILL-3159: Part 2--Core: Make JDBC throttling threshold configurable.
Added configuration/option "drill.jdbc.batch_queue_throttling_threshold".
Applied "drill.jdbc.batch_queue_throttling_threshold" to DrillResultSetImpl.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/acf5566e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/acf5566e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/acf5566e
Branch: refs/heads/master
Commit: acf5566e81ca3fafe83309188499aabb2091b945
Parents: 0c69631
Author: dbarclay <db...@maprtech.com>
Authored: Thu May 14 15:36:14 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Jun 2 12:26:11 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 3 +++
.../src/main/resources/drill-module.conf | 4 ++++
.../drill/jdbc/impl/DrillResultSetImpl.java | 24 ++++++++++++++++----
3 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be67f9d..91793f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -87,6 +87,9 @@ public interface ExecConstants {
public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
+ /** Size of JDBC batch queue (in batches) above which throttling begins. */
+ public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
+ "drill.jdbc.batch_queue_throttling_threshold";
/**
* Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 66055f1..dbe449a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -163,3 +163,7 @@ drill.exec: {
return_error_for_failure_in_cancelled_fragments: false
}
}
+
+drill.jdbc: {
+ batch_queue_throttling_threshold: 100
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 385ccf5..cb6bd1d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -31,6 +31,7 @@ import net.hydromatic.avatica.AvaticaResultSet;
import net.hydromatic.avatica.AvaticaStatement;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
@@ -63,7 +64,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public SchemaChangeListener changeListener;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
- public final ResultsListener resultsListener = new ResultsListener();
+ public final ResultsListener resultsListener;
private final DrillClient client;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
// TODO: Resolve: Since is barely manipulated here in DrillResultSetImpl,
@@ -77,6 +78,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
super(statement, prepareResult, resultSetMetaData, timeZone);
this.statement = statement;
+ final int batchQueueThrottlingThreshold =
+ this.getStatement().getConnection().getClient().getConfig().getInt(
+ ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
+ resultsListener = new ResultsListener( batchQueueThrottlingThreshold );
DrillConnection c = (DrillConnection) statement.getConnection();
DrillClient client = c.getClient();
currentBatch = new RecordBatchLoader(client.getAllocator());
@@ -188,12 +193,13 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
public static class ResultsListener implements UserResultsListener {
private static final Logger logger = getLogger( ResultsListener.class );
- private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
private static volatile int nextInstanceId = 1;
/** (Just for logging.) */
private final int instanceId;
+ private final int batchQueueThrottlingThreshold;
+
/** (Just for logging.) */
private volatile QueryId queryId;
@@ -225,8 +231,14 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
Queues.newLinkedBlockingDeque();
- ResultsListener() {
+ /**
+ * ...
+ * @param batchQueueThrottlingThreshold
+ * queue size threshold for throttling server
+ */
+ ResultsListener( int batchQueueThrottlingThreshold ) {
instanceId = nextInstanceId++;
+ this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
logger.debug( "[#{}] Query listener created.", instanceId );
}
@@ -245,7 +257,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
}
/**
- * Stops throttling if currently active.
+ * Stops throttling if currently throttling.
* @return true if actually stopped (was throttling)
*/
private boolean stopThrottlingIfSo() {
@@ -300,7 +312,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// We're active; let's add to the queue.
batchQueue.add(result);
- if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
+
+ // Throttle server if queue size has exceed threshold.
+ if (batchQueue.size() > batchQueueThrottlingThreshold ) {
if ( startThrottlingIfNot( throttle ) ) {
logger.debug( "[#{}] Throttling started at queue size {}.",
instanceId, batchQueue.size() );