You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:47 UTC
[01/17] drill git commit: Add measurement of time taken to start a
thread in timed runnable
Repository: drill
Updated Branches:
refs/heads/master 5c4a9b212 -> d8b197596
Add measurement of time taken to start a thread in timed runnable
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c5f1c83f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c5f1c83f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c5f1c83f
Branch: refs/heads/master
Commit: c5f1c83f26e65ab71955898e875fdf2c3f681953
Parents: 5c4a9b2
Author: Parth Chandra <pa...@apache.org>
Authored: Wed May 13 15:42:06 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Thu May 14 19:53:32 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/store/TimedRunnable.java | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c5f1c83f/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index 240aaef..5a35aff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -44,12 +44,14 @@ public abstract class TimedRunnable<V> implements Runnable {
private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
private volatile Exception e;
+ private volatile long threadStart;
private volatile long timeNanos;
private volatile V value;
@Override
public final void run() {
long start = System.nanoTime();
+ threadStart=start;
try{
value = runInner();
}catch(Exception e){
@@ -62,6 +64,9 @@ public abstract class TimedRunnable<V> implements Runnable {
protected abstract V runInner() throws Exception ;
protected abstract IOException convertToIOException(Exception e);
+ public long getThreadStart(){
+ return threadStart;
+ }
public long getTimeSpentNanos(){
return timeNanos;
}
@@ -111,7 +116,7 @@ public abstract class TimedRunnable<V> implements Runnable {
*/
public static <V> List<V> run(final String activity, final Logger logger, final List<TimedRunnable<V>> runnables, int parallelism) throws IOException {
Stopwatch watch = new Stopwatch().start();
-
+ long timedRunnableStart=System.nanoTime();
if(runnables.size() == 1){
parallelism = 1;
runnables.get(0).run();
@@ -158,6 +163,10 @@ public abstract class TimedRunnable<V> implements Runnable {
long sum = 0;
long max = 0;
long count = 0;
+ // measure thread creation times
+ long earliestStart=Long.MAX_VALUE;
+ long latestStart=0;
+ long totalStart=0;
IOException excep = null;
for(final TimedRunnable<V> reader : runnables){
try{
@@ -165,6 +174,9 @@ public abstract class TimedRunnable<V> implements Runnable {
sum += reader.getTimeSpentNanos();
count++;
max = Math.max(max, reader.getTimeSpentNanos());
+ earliestStart=Math.min(earliestStart, reader.getThreadStart() - timedRunnableStart);
+ latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart);
+ totalStart+=latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart);
}catch(IOException e){
if(excep == null){
excep = e;
@@ -176,11 +188,16 @@ public abstract class TimedRunnable<V> implements Runnable {
if(logger.isInfoEnabled()){
double avg = (sum/1000.0/1000.0)/(count*1.0d);
+ double avgStart = (totalStart/1000.0)/(count*1.0d);
logger.info(
String.format("%s: Executed %d out of %d using %d threads. "
+ "Time: %dms total, %fms avg, %dms max.",
activity, count, runnables.size(), parallelism, watch.elapsed(TimeUnit.MILLISECONDS), avg, max/1000/1000));
+ logger.info(
+ String.format("%s: Executed %d out of %d using %d threads. "
+ + "Earliest start: %f \u03BCs, Latest start: %f \u03BCs, Average start: %f \u03BCs .",
+ activity, count, runnables.size(), parallelism, earliestStart/1000.0, latestStart/1000.0, avgStart));
}
if(excep != null) {
[04/17] drill git commit: DRILL-3072: Update root fragment to not to
modify the Foreman state directly.
Posted by ja...@apache.org.
DRILL-3072: Update root fragment to not to modify the Foreman state directly.
Instead use RPC mechanism to send and receive status updates
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4dcb3e75
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4dcb3e75
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4dcb3e75
Branch: refs/heads/master
Commit: 4dcb3e75643b71daa7f458e1824ac7eb7fc10cde
Parents: e58a306
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 13 19:42:24 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/work/foreman/Foreman.java | 9 ++--
.../drill/exec/work/foreman/QueryManager.java | 47 +++++---------------
2 files changed, 17 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 6840cf3..5d07b49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -205,7 +205,7 @@ public class Foreman implements Runnable {
// resume all pauses through query context
queryContext.getExecutionControls().unpauseAll();
// resume all pauses through all fragment contexts
- queryManager.unpauseExecutingFragments(drillbitContext, rootRunner);
+ queryManager.unpauseExecutingFragments(drillbitContext);
}
/**
@@ -810,7 +810,7 @@ public class Foreman implements Runnable {
assert exception == null;
queryManager.markEndTime();
recordNewState(QueryState.CANCELLATION_REQUESTED);
- queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ queryManager.cancelExecutingFragments(drillbitContext);
foremanResult.setCompleted(QueryState.CANCELED);
/*
* We don't close the foremanResult until we've gotten
@@ -833,7 +833,7 @@ public class Foreman implements Runnable {
assert exception != null;
queryManager.markEndTime();
recordNewState(QueryState.FAILED);
- queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ queryManager.cancelExecutingFragments(drillbitContext);
foremanResult.setFailed(exception);
foremanResult.close();
return;
@@ -934,7 +934,8 @@ public class Foreman implements Runnable {
queryManager.addFragmentStatusTracker(rootFragment, true);
- rootRunner = new FragmentExecutor(rootContext, rootFragment, queryManager.newRootStatusHandler(rootContext),
+ rootRunner = new FragmentExecutor(rootContext, rootFragment,
+ queryManager.newRootStatusHandler(rootContext, drillbitContext),
rootOperator);
final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index eed4e17..71b77c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PStore;
@@ -53,6 +54,7 @@ import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
import org.apache.drill.exec.work.fragment.StatusReporter;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
@@ -176,17 +178,15 @@ public class QueryManager {
/**
* Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
- * (1) Root fragment
- * (a) If the root is pending, delegate the cancellation to local work bus.
- * (b) If the root is running, cancel the fragment directly.
*
* For the actual cancel calls for intermediate and leaf fragments, see
* {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
+ * (1) Root fragment: pending or running, send the cancel signal through a tunnel.
* (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
* fragments). The actual cancel is done by delegating the cancel to the work bus.
* (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
*/
- void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+ void cancelExecutingFragments(final DrillbitContext drillbitContext) {
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
switch(data.getState()) {
@@ -194,19 +194,10 @@ public class QueryManager {
case AWAITING_ALLOCATION:
case RUNNING:
final FragmentHandle handle = data.getHandle();
- if (rootRunner.getContext().getHandle().equals(handle)) {
- // Case 1.a: pending root is in the work bus. Delegate the cancel to the work bus.
- final boolean removed = drillbitContext.getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
- // Case 1.b: running root. Cancel directly.
- if (!removed) {
- rootRunner.cancel();
- }
- } else {
- final DrillbitEndpoint endpoint = data.getEndpoint();
- // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
- controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
+ final DrillbitEndpoint endpoint = data.getEndpoint();
+ // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
+ controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
SignalListener.Signal.CANCEL), handle);
- }
break;
case FINISHED:
@@ -221,13 +212,9 @@ public class QueryManager {
/**
* Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
- * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the
- * control tunnel.
+ * sending any message. Resume all fragments through the control tunnel.
*/
- void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
- if (rootRunner != null) {
- rootRunner.unpause();
- }
+ void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
final DrillbitEndpoint endpoint = data.getEndpoint();
@@ -447,19 +434,9 @@ public class QueryManager {
}
}
- public StatusReporter newRootStatusHandler(final FragmentContext context) {
- return new RootStatusReporter(context);
- }
-
- private class RootStatusReporter extends AbstractStatusReporter {
- private RootStatusReporter(final FragmentContext context) {
- super(context);
- }
-
- @Override
- protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
- fragmentStatusListener.statusUpdate(status);
- }
+ public StatusReporter newRootStatusHandler(final FragmentContext context, final DrillbitContext dContext) {
+ final ControlTunnel tunnel = dContext.getController().getTunnel(foreman.getQueryContext().getCurrentEndpoint());
+ return new NonRootStatusReporter(context, tunnel);
}
public FragmentStatusListener getFragmentStatusListener(){
[14/17] drill git commit: DRILL-3089: Revert to 2 forked test and
allow override from command line
Posted by ja...@apache.org.
DRILL-3089: Revert to 2 forked test and allow override from command line
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7c782444
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7c782444
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7c782444
Branch: refs/heads/master
Commit: 7c7824443d8a6621bfe76f384c0598b511fb25b8
Parents: 6958246
Author: Aditya Kishore <ad...@apache.org>
Authored: Thu May 14 16:11:08 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:18:02 2015 -0700
----------------------------------------------------------------------
pom.xml | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/7c782444/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 678d201..e4b7bb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,6 +33,7 @@
<proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
<dep.junit.version>4.11</dep.junit.version>
<dep.slf4j.version>1.7.6</dep.slf4j.version>
+ <forkCount>2</forkCount>
<parquet.version>1.6.0rc3-drill-r0.3</parquet.version>
</properties>
@@ -370,7 +371,7 @@
-Dorg.apache.drill.exec.server.Drillbit.system_options="org.apache.drill.exec.compile.ClassTransformer.scalar_replacement=on"
-XX:MaxPermSize=256M -XX:MaxDirectMemorySize=3072M
-XX:+CMSClassUnloadingEnabled -ea</argLine>
- <forkCount>1C</forkCount>
+ <forkCount>${forkCount}</forkCount>
<reuseForks>true</reuseForks>
<additionalClasspathElements>
<additionalClasspathElement>./exec/jdbc/src/test/resources/storage-plugins.json</additionalClasspathElement>
[12/17] drill git commit: Fix PartitionSenderRootExec possible memory
leak.
Posted by ja...@apache.org.
Fix PartitionSenderRootExec possible memory leak.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/62a73bcd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/62a73bcd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/62a73bcd
Branch: refs/heads/master
Commit: 62a73bcd82464b0a48a234e09040ed069033b848
Parents: aaf9fb8
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 21:55:44 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:14:59 2015 -0700
----------------------------------------------------------------------
.../PartitionSenderRootExec.java | 41 +++++++++++++-------
1 file changed, 26 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/62a73bcd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 1872a51..31fc160 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -230,25 +230,36 @@ public class PartitionSenderRootExec extends BaseRootExec {
final List<Partitioner> subPartitioners = createClassInstances(actualPartitions);
int startIndex = 0;
int endIndex = 0;
- for (int i = 0; i < actualPartitions; i++) {
- startIndex = endIndex;
- endIndex = (i < actualPartitions - 1 ) ? startIndex + divisor : outGoingBatchCount;
- if ( i < longTail ) {
- endIndex++;
+
+ boolean success = false;
+ try {
+ for (int i = 0; i < actualPartitions; i++) {
+ startIndex = endIndex;
+ endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
+ if (i < longTail) {
+ endIndex++;
+ }
+ final OperatorStats partitionStats = new OperatorStats(stats, true);
+ subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
+ startIndex, endIndex);
+ }
+
+ synchronized (this) {
+ partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+ for (int index = 0; index < terminations.size(); index++) {
+ partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+ }
+ terminations.clear();
}
- final OperatorStats partitionStats = new OperatorStats(stats, true);
- subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
- startIndex, endIndex);
- }
- synchronized(this){
- partitioner = new PartitionerDecorator(subPartitioners, stats, context);
- for (int index = 0; index < terminations.size(); index++) {
- partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+ success = true;
+ } finally {
+ if (!success) {
+ for (Partitioner p : subPartitioners) {
+ p.clear();
+ }
}
- terminations.clear();
}
-
}
private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {
[05/17] drill git commit: DRILL-3063: TestQueriesOnLargeFile leaks
memory with 16M limit Changed the cleanup handling at the end of
ImplCreator.getExec(),
and handle the newly returned null value in FragmentExecutor.run().
Posted by ja...@apache.org.
DRILL-3063: TestQueriesOnLargeFile leaks memory with 16M limit
Changed the cleanup handling at the end of ImplCreator.getExec(), and
handle the newly returned null value in FragmentExecutor.run().
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e58a3063
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e58a3063
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e58a3063
Branch: refs/heads/master
Commit: e58a30638fd7fc700b1e93d05503ddb7a7b76643
Parents: bef60f5
Author: Chris Westin <cw...@yahoo.com>
Authored: Thu May 14 15:53:05 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/physical/impl/ImplCreator.java | 13 ++++++-------
.../drill/exec/work/fragment/FragmentExecutor.java | 3 +++
2 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e58a3063/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 77ca0f5..66558be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -77,7 +77,6 @@ public class ImplCreator {
Stopwatch watch = new Stopwatch();
watch.start();
- boolean success = false;
try {
final RootExec rootExec = creator.getRootExec(root, context);
// skip over this for SimpleRootExec (testing)
@@ -91,15 +90,15 @@ public class ImplCreator {
"The provided fragment did not have a root node that correctly created a RootExec value.");
}
- success = true;
return rootExec;
- } finally {
- if (!success) {
- for(final CloseableRecordBatch crb : creator.getOperators()) {
- AutoCloseables.close(crb, logger);
- }
+ } catch(Exception e) {
+ context.fail(e);
+ for(final CloseableRecordBatch crb : creator.getOperators()) {
+ AutoCloseables.close(crb, logger);
}
}
+
+ return null;
}
/** Create RootExec and its children (RecordBatches) for given FragmentRoot */
http://git-wip-us.apache.org/repos/asf/drill/blob/e58a3063/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 8c49d68..ffb76b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -226,6 +226,9 @@ public class FragmentExecutor implements Runnable {
drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
root = ImplCreator.getExec(fragmentContext, rootOperator);
+ if (root == null) {
+ return;
+ }
clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
updateState(FragmentState.RUNNING);
[06/17] drill git commit: Update Drill to use latest sqlline
Posted by ja...@apache.org.
Update Drill to use latest sqlline
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3c879974
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3c879974
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3c879974
Branch: refs/heads/master
Commit: 3c879974837ce78c5041f3492519e9ea94fc4a5a
Parents: eca6cd7
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 09:16:41 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700
----------------------------------------------------------------------
distribution/src/resources/sqlline | 4 +++-
pom.xml | 4 ++--
2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3c879974/distribution/src/resources/sqlline
----------------------------------------------------------------------
diff --git a/distribution/src/resources/sqlline b/distribution/src/resources/sqlline
index b1b9014..2c9c783 100644
--- a/distribution/src/resources/sqlline
+++ b/distribution/src/resources/sqlline
@@ -39,7 +39,9 @@ bin=`cd "$bin">/dev/null; pwd`
# is.)
# Put our property specification before previous value of DRILL_SHELL_JAVA_OPTS
# so that it can still be overridden via DRILL_SHELL_JAVA_OPTS.
-DRILL_SHELL_JAVA_OPTS="-Dsqlline.isolation=TRANSACTION_NONE $DRILL_SHELL_JAVA_OPTS"
+#
+# This is not currently needed as the new SQLLine we are using doesn't isolate.
+# DRILL_SHELL_JAVA_OPTS="-Dsqlline.isolation=TRANSACTION_NONE $DRILL_SHELL_JAVA_OPTS"
DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/sqlline.log -Dlog.query.path=$DRILL_LOG_DIR/sqlline_queries.json"
http://git-wip-us.apache.org/repos/asf/drill/blob/3c879974/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cf17a79..678d201 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1032,7 +1032,7 @@
<dependency>
<groupId>sqlline</groupId>
<artifactId>sqlline</artifactId>
- <version>1.1.6</version>
+ <version>1.1.9-drill-r3</version>
</dependency>
<dependency>
@@ -1370,7 +1370,7 @@
<dependency>
<groupId>sqlline</groupId>
<artifactId>sqlline</artifactId>
- <version>1.1.6</version>
+ <version>1.1.9-drill-r3</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
[02/17] drill git commit: DRILL-3061: Fix memory leaks in
TestDrillbitResilience
Posted by ja...@apache.org.
DRILL-3061: Fix memory leaks in TestDrillbitResilience
- fixes a race condition in WorkEventBus
- marking TestDrillbitResilience with @Ignore
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bef60f58
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bef60f58
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bef60f58
Branch: refs/heads/master
Commit: bef60f58a79b6bb41c06d59897577d7f7017c526
Parents: c5f1c83
Author: adeneche <ad...@gmail.com>
Authored: Tue May 12 10:53:32 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:43 2015 -0700
----------------------------------------------------------------------
.../drill/exec/rpc/control/WorkEventBus.java | 16 +-
.../apache/drill/exec/rpc/data/DataServer.java | 2 +-
.../org/apache/drill/exec/ZookeeperHelper.java | 14 +-
.../exec/server/TestDrillbitResilience.java | 190 ++++++++++++++++---
4 files changed, 190 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index ddd7828..3e461ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -90,16 +90,16 @@ public class WorkEventBus {
}
public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
- // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
- if (recentlyFinishedFragments.asMap().containsKey(handle)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+ synchronized (this) {
+ // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
+ if (recentlyFinishedFragments.asMap().containsKey(handle)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+ }
+ return null;
}
- return null;
- }
- // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
- synchronized (this) {
+ // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
final FragmentManager m = managers.get(handle);
if (m != null) {
return m;
http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 0d4077e..061ddcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -150,7 +150,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
logger.error("Failure while getting fragment manager. {}",
QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
fragmentBatch.getReceivingMajorFragmentId(),
- fragmentBatch.getReceivingMinorFragmentIdList()));
+ fragmentBatch.getReceivingMinorFragmentIdList()), e);
ack.clear();
sender.send(new Response(RpcType.ACK, Acks.FAIL));
} finally {
http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
index a5db81d..630c81b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -46,10 +46,22 @@ public class ZookeeperHelper {
* <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
*/
public ZookeeperHelper() {
+ this(false);
+ }
+
+ /**
+ * Constructor.
+ *
+ * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
+ * @param failureInCancelled pass true if you want failures in cancelled fragments to be reported as failures
+ */
+ public ZookeeperHelper(boolean failureInCancelled) {
final Properties overrideProps = new Properties();
// Forced to disable this, because currently we leak memory which is a known issue for query cancellations.
// Setting this causes unittests to fail.
- // overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+ if (failureInCancelled) {
+ overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+ }
config = DrillConfig.create(overrideProps);
zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 8552ec1..696aed8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -30,8 +30,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.google.common.base.Preconditions;
import org.apache.commons.math3.util.Pair;
+import org.apache.drill.BaseTestQuery;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.SingleRowListener;
import org.apache.drill.common.AutoCloseables;
@@ -48,11 +48,9 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.physical.impl.ScreenCreator;
import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
-import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
-import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
@@ -88,11 +86,14 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
+import com.google.common.base.Preconditions;
+
/**
* Test how resilient drillbits are to throwing exceptions during various phases of query
* execution by injecting exceptions at various points and to cancellations in various phases.
* The test cases are mentioned in DRILL-2383.
*/
+@Ignore
public class TestDrillbitResilience extends DrillTest {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
@@ -164,7 +165,7 @@ public class TestDrillbitResilience extends DrillTest {
// turn off the HTTP server to avoid port conflicts between the drill bits
System.setProperty(ExecConstants.HTTP_ENABLE, "false");
- zkHelper = new ZookeeperHelper();
+ zkHelper = new ZookeeperHelper(true);
zkHelper.startZookeeper(1);
// use a non-null service set so that the drillbits can use port hunting
@@ -269,7 +270,6 @@ public class TestDrillbitResilience extends DrillTest {
assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty());
}
- @SuppressWarnings("static-method")
@After
public void checkDrillbits() {
clearAllInjections(); // so that the drillbit check itself doesn't trigger anything
@@ -355,6 +355,8 @@ public class TestDrillbitResilience extends DrillTest {
@Test
public void settingNoopInjectionsAndQuery() {
+ final long before = countAllocatedMemory();
+
final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
setControls(controls);
try {
@@ -362,6 +364,9 @@ public class TestDrillbitResilience extends DrillTest {
} catch (final Exception e) {
fail(e.getMessage());
}
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
/**
@@ -381,16 +386,24 @@ public class TestDrillbitResilience extends DrillTest {
}
}
- @SuppressWarnings("static-method")
@Test
public void foreman_runTryBeginning() {
+ final long before = countAllocatedMemory();
+
testForeman("run-try-beginning");
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
- @SuppressWarnings("static-method")
@Test
public void foreman_runTryEnd() {
+ final long before = countAllocatedMemory();
+
testForeman("run-try-end");
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
/**
@@ -463,7 +476,7 @@ public class TestDrillbitResilience extends DrillTest {
}
result.release();
}
- };
+ }
/**
* Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
@@ -537,7 +550,11 @@ public class TestDrillbitResilience extends DrillTest {
* Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
*/
private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
- assertCancelled(controls, TEST_QUERY, listener);
+ assertCancelledWithoutException(controls, listener, TEST_QUERY);
+ }
+
+ private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener, final String query) {
+ assertCancelled(controls, query, listener);
}
/**
@@ -579,6 +596,9 @@ public class TestDrillbitResilience extends DrillTest {
@Test // To test pause and resume. Test hangs if resume did not happen.
public void passThrough() {
+ final long before = countAllocatedMemory();
+
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
@Override
public void queryIdArrived(final QueryId queryId) {
@@ -595,11 +615,16 @@ public class TestDrillbitResilience extends DrillTest {
QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
assertCompleteState(result, QueryState.COMPLETED);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Cancellation TC 1: cancel before any result set is returned
@Ignore // DRILL-3052
public void cancelBeforeAnyResultsArrive() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
@Override
@@ -611,10 +636,15 @@ public class TestDrillbitResilience extends DrillTest {
final String controls = createPauseInjection(Foreman.class, "foreman-ready");
assertCancelledWithoutException(controls, listener);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Cancellation TC 2: cancel in the middle of fetching result set
public void cancelInMiddleOfFetchingResults() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private boolean cancelRequested = false;
@@ -632,11 +662,16 @@ public class TestDrillbitResilience extends DrillTest {
// skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume
final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
assertCancelledWithoutException(controls, listener);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Cancellation TC 3: cancel after all result set are produced but not all are fetched
public void cancelAfterAllResultsProduced() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private int count = 0;
@@ -652,10 +687,16 @@ public class TestDrillbitResilience extends DrillTest {
final String controls = createPauseInjection(ScreenCreator.class, "send-complete");
assertCancelledWithoutException(controls, listener);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Cancellation TC 4: cancel after everything is completed and fetched
+ @Ignore
public void cancelAfterEverythingIsCompleted() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private int count = 0;
@@ -671,16 +712,25 @@ public class TestDrillbitResilience extends DrillTest {
final String controls = createPauseInjection(Foreman.class, "foreman-cleanup");
assertCancelledWithoutException(controls, listener);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Completion TC 1: success
public void successfullyCompletes() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
assertCompleteState(result, QueryState.COMPLETED);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
+
/**
* Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
*/
@@ -702,26 +752,41 @@ public class TestDrillbitResilience extends DrillTest {
@Test // Completion TC 2: failed query - before query is executed - while sql parsing
public void failsWhenParsing() {
+ final long before = countAllocatedMemory();
+
final String exceptionDesc = "sql-parsing";
final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
final String controls = createSingleException(DrillSqlWorker.class, exceptionDesc, exceptionClass);
assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Completion TC 3: failed query - before query is executed - while sending fragments to other drillbits
public void failsWhenSendingFragments() {
+ final long before = countAllocatedMemory();
+
final String exceptionDesc = "send-fragments";
final Class<? extends Throwable> exceptionClass = ForemanException.class;
final String controls = createSingleException(Foreman.class, exceptionDesc, exceptionClass);
assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Completion TC 4: failed query - during query execution
public void failsDuringExecution() {
+ final long before = countAllocatedMemory();
+
final String exceptionDesc = "fragment-execution";
final Class<? extends Throwable> exceptionClass = IOException.class;
final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
/**
@@ -730,8 +795,13 @@ public class TestDrillbitResilience extends DrillTest {
*/
@Test
public void testInterruptingBlockedMergingRecordBatch() {
+ final long before = countAllocatedMemory();
+
final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1);
testInterruptingBlockedFragmentsWaitingForData(control);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
/**
@@ -740,8 +810,13 @@ public class TestDrillbitResilience extends DrillTest {
*/
@Test
public void testInterruptingBlockedUnorderedReceiverBatch() {
+ final long before = countAllocatedMemory();
+
final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1);
testInterruptingBlockedFragmentsWaitingForData(control);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
private static void testInterruptingBlockedFragmentsWaitingForData(final String control) {
@@ -769,22 +844,27 @@ public class TestDrillbitResilience extends DrillTest {
setSessionOption(HASHAGG.getOptionName(), "true");
setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
+ final long before = countAllocatedMemory();
+
final String controls = "{\"injections\" : ["
- + "{"
- + "\"type\" : \"latch\","
- + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
- + "\"desc\" : \"partitioner-sender-latch\""
- + "},"
- + "{"
- + "\"type\" : \"pause\","
- + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
- + "\"desc\" : \"wait-for-fragment-interrupt\","
- + "\"nSkip\" : 1"
- + "}" +
- "]}";
+ + "{"
+ + "\"type\" : \"latch\","
+ + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+ + "\"desc\" : \"partitioner-sender-latch\""
+ + "},"
+ + "{"
+ + "\"type\" : \"pause\","
+ + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+ + "\"desc\" : \"wait-for-fragment-interrupt\","
+ + "\"nSkip\" : 1"
+ + "}" +
+ "]}";
final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
} finally {
setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
@@ -795,9 +875,75 @@ public class TestDrillbitResilience extends DrillTest {
@Test
public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception {
+
+ final long before = countAllocatedMemory();
+
final String control =
- createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
+ createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ }
+
+ @Test
+ public void memoryLeaksWhenCancelled() {
+ setSessionOption(SLICE_TARGET, "10");
+
+ final long before = countAllocatedMemory();
+
+ final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
+ String query = null;
+ try {
+ query = BaseTestQuery.getFile("queries/tpch/09.sql");
+ } catch (final IOException e) {
+ fail("Failed to get query file: " + e);
+ }
+
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
+ private boolean cancelRequested = false;
+
+ @Override
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ if (!cancelRequested) {
+ check(queryId != null, "Query id should not be null, since we have waited long enough.");
+ cancelAndResume();
+ cancelRequested = true;
+ }
+ result.release();
+ }
+ };
+
+ assertCancelledWithoutException(controls, listener, query.substring(0, query.length() - 1));
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ }
+
+ @Test
+ public void memoryLeaksWhenFailed() {
+ setSessionOption(SLICE_TARGET, "10");
+
+ final long before = countAllocatedMemory();
+
+ final String exceptionDesc = "fragment-execution";
+ final Class<? extends Throwable> exceptionClass = IOException.class;
+ final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
+ String query = null;
+ try {
+ query = BaseTestQuery.getFile("queries/tpch/09.sql");
+ } catch (final IOException e) {
+ fail("Failed to get query file: " + e);
+ }
+
+ assertFailsWithException(controls, exceptionClass, exceptionDesc, query.substring(0, query.length() - 1));
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
}
@Test // DRILL-3065
[09/17] drill git commit: In the case of extended RPC message
handling, write warning to log
Posted by ja...@apache.org.
In the case of extended RPC message handling, write warning to log
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7fccf7e1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7fccf7e1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7fccf7e1
Branch: refs/heads/master
Commit: 7fccf7e1fd9f108edbd5bf177339173d900b89a3
Parents: 3c87997
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 12:19:46 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/drill/exec/rpc/RpcBus.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/7fccf7e1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 92ce312..1a23724 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -31,12 +31,14 @@ import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
@@ -203,8 +205,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
logger.debug("Received message {}", msg);
}
final Channel channel = connection.getChannel();
+ final Stopwatch watch = new Stopwatch().start();
try{
+
switch (msg.mode) {
case REQUEST: {
// handle message and ack.
@@ -270,6 +274,13 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
throw new UnsupportedOperationException();
}
} finally {
+ long time = watch.elapsed(TimeUnit.MILLISECONDS);
+ long delayThreshold = Integer.parseInt(System.getProperty("drill.exec.rpcDelayWarning", "500"));
+ if (time > delayThreshold) {
+ logger.warn(String.format(
+ "Message of mode %s of rpc type %d took longer than %dms. Actual duration was %dms.",
+ msg.mode, msg.rpcType, delayThreshold, time));
+ }
msg.release();
}
}
[03/17] drill git commit: Add convenience drill-* executables for
sqlline. Rename drill_dumpcat to dumpcat
Posted by ja...@apache.org.
Add convenience drill-* executables for sqlline. Rename drill_dumpcat to dumpcat
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/eca6cd71
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/eca6cd71
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/eca6cd71
Branch: refs/heads/master
Commit: eca6cd7136557273e4ff933f12dc3f24756a9cda
Parents: 4dcb3e7
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 10:13:35 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700
----------------------------------------------------------------------
distribution/src/assemble/bin.xml | 17 ++++++++++++++++-
distribution/src/resources/drill-conf | 22 ++++++++++++++++++++++
distribution/src/resources/drill-embedded | 22 ++++++++++++++++++++++
distribution/src/resources/drill-localhost | 22 ++++++++++++++++++++++
distribution/src/resources/drill_dumpcat | 25 -------------------------
distribution/src/resources/dumpcat | 25 +++++++++++++++++++++++++
6 files changed, 107 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 0576fd2..684ff19 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -245,6 +245,21 @@
<outputDirectory>bin</outputDirectory>
</file>
<file>
+ <source>src/resources/drill-conf</source>
+ <fileMode>0755</fileMode>
+ <outputDirectory>bin</outputDirectory>
+ </file>
+ <file>
+ <source>src/resources/drill-embedded</source>
+ <fileMode>0755</fileMode>
+ <outputDirectory>bin</outputDirectory>
+ </file>
+ <file>
+ <source>src/resources/drill-localhost</source>
+ <fileMode>0755</fileMode>
+ <outputDirectory>bin</outputDirectory>
+ </file>
+ <file>
<source>src/resources/drill-config.sh</source>
<fileMode>0755</fileMode>
<outputDirectory>bin</outputDirectory>
@@ -264,7 +279,7 @@
<outputDirectory>bin</outputDirectory>
</file>
<file>
- <source>src/resources/drill_dumpcat</source>
+ <source>src/resources/dumpcat</source>
<fileMode>0755</fileMode>
<outputDirectory>bin</outputDirectory>
</file>
http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/drill-conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-conf b/distribution/src/resources/drill-conf
new file mode 100755
index 0000000..00e0d25
--- /dev/null
+++ b/distribution/src/resources/drill-conf
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+# Start sqlline session using connection settings from configuration file
+exec ${bin}/sqlline -u "jdbc:drill:" "$@"
http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/drill-embedded
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-embedded b/distribution/src/resources/drill-embedded
new file mode 100755
index 0000000..d66ba7b
--- /dev/null
+++ b/distribution/src/resources/drill-embedded
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+# Start a sqlline session with an embedded Drillbit
+exec ${bin}/sqlline -u "jdbc:drill:zk=local" "$@"
http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/drill-localhost
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-localhost b/distribution/src/resources/drill-localhost
new file mode 100755
index 0000000..454045c
--- /dev/null
+++ b/distribution/src/resources/drill-localhost
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+# Start sqlline session by connection to locally running Drillbit
+exec ${bin}/sqlline -u "jdbc:drill:drillbit=localhost" "$@"
http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/drill_dumpcat
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill_dumpcat b/distribution/src/resources/drill_dumpcat
deleted file mode 100755
index a2ea4d3..0000000
--- a/distribution/src/resources/drill_dumpcat
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "${BASH_SOURCE-$0}"`
-bin=`cd "$bin">/dev/null; pwd`
-
-. "$bin"/drill-config.sh
-
-DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/drill_dumpcat.log"
-
-exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.client.DumpCat $@
http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/dumpcat
----------------------------------------------------------------------
diff --git a/distribution/src/resources/dumpcat b/distribution/src/resources/dumpcat
new file mode 100755
index 0000000..a2ea4d3
--- /dev/null
+++ b/distribution/src/resources/dumpcat
@@ -0,0 +1,25 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+. "$bin"/drill-config.sh
+
+DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/drill_dumpcat.log"
+
+exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.client.DumpCat $@
[08/17] drill git commit: Update sys.options table to only show
options available via ALTER SYSTEM or ALTER SESSION. Move BOOT options to
sys.boot table.
Posted by ja...@apache.org.
Update sys.options table to only show options available via ALTER SYSTEM or ALTER SESSION. Move BOOT options to sys.boot table.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/22d6465e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/22d6465e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/22d6465e
Branch: refs/heads/master
Commit: 22d6465e0037b84bd5749a2b3654de1de21bb9de
Parents: 7fccf7e
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 13:41:26 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700
----------------------------------------------------------------------
.../drill/exec/server/options/OptionValue.java | 7 +-
.../drill/exec/store/sys/OptionIterator.java | 126 +++++++++++++++++++
.../drill/exec/store/sys/SystemTable.java | 84 ++-----------
3 files changed, 143 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/22d6465e/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
index 8735fd6..487553e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.google.common.base.Preconditions;
@JsonInclude(Include.NON_NULL)
-public class OptionValue {
+public class OptionValue implements Comparable<OptionValue> {
public static enum OptionType {
BOOT, SYSTEM, SESSION, QUERY
@@ -179,6 +179,11 @@ public class OptionValue {
}
@Override
+ public int compareTo(OptionValue o) {
+ return this.name.compareTo(o.name);
+ }
+
+ @Override
public String toString() {
return "OptionValue [type=" + type + ", name=" + name + ", value=" + getValue() + "]";
}
http://git-wip-us.apache.org/repos/asf/drill/blob/22d6465e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
new file mode 100644
index 0000000..3e42436
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.server.options.DrillConfigIterator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+public class OptionIterator implements Iterator<Object> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OptionIterator.class);
+
+ enum Mode {
+ BOOT, SYS_SESS, BOTH
+ };
+
+ private final OptionManager fragmentOptions;
+ private final Iterator<OptionValue> mergedOptions;
+
+ public OptionIterator(FragmentContext context, Mode mode){
+ final DrillConfigIterator configOptions = new DrillConfigIterator(context.getConfig());
+ fragmentOptions = context.getOptions();
+ final Iterator<OptionValue> optionList;
+ switch(mode){
+ case BOOT:
+ optionList = configOptions.iterator();
+ break;
+ case SYS_SESS:
+ optionList = fragmentOptions.iterator();
+ break;
+ default:
+ optionList = Iterators.concat(configOptions.iterator(), fragmentOptions.iterator());
+ }
+
+ List<OptionValue> values = Lists.newArrayList(optionList);
+ Collections.sort(values);
+ mergedOptions = values.iterator();
+
+ }
+
+ @Override
+ public boolean hasNext() {
+ return mergedOptions.hasNext();
+ }
+
+ @Override
+ public OptionValueWrapper next() {
+ final OptionValue value = mergedOptions.next();
+ final Status status;
+ if (value.type == OptionType.BOOT) {
+ status = Status.BOOT;
+ } else {
+ final OptionValue def = fragmentOptions.getSystemManager().getDefault(value.name);
+ if (value.equals(def)) {
+ status = Status.DEFAULT;
+ } else {
+ status = Status.CHANGED;
+ }
+ }
+ return new OptionValueWrapper(value.name, value.kind, value.type, value.num_val, value.string_val,
+ value.bool_val, value.float_val, status);
+ }
+
+ public static enum Status {
+ BOOT, DEFAULT, CHANGED
+ }
+
+ /**
+ * Wrapper class for OptionValue to add Status
+ */
+ public static class OptionValueWrapper {
+
+
+
+ public final String name;
+ public final Kind kind;
+ public final OptionType type;
+ public final Status status;
+ public final Long num_val;
+ public final String string_val;
+ public final Boolean bool_val;
+ public final Double float_val;
+
+ public OptionValueWrapper(final String name, final Kind kind, final OptionType type, final Long num_val,
+ final String string_val, final Boolean bool_val, final Double float_val,
+ final Status status) {
+ this.name = name;
+ this.kind = kind;
+ this.type = type;
+ this.num_val = num_val;
+ this.string_val = string_val;
+ this.bool_val = bool_val;
+ this.float_val = float_val;
+ this.status = status;
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/22d6465e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index cd8af08..1d73001 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -17,17 +17,11 @@
*/
package org.apache.drill.exec.store.sys;
-import com.google.common.collect.Iterators;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.server.options.DrillConfigIterator;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.OptionValue;
-import org.apache.drill.exec.server.options.OptionValue.Kind;
-import org.apache.drill.exec.server.options.OptionValue.OptionType;
-import org.apache.drill.exec.store.sys.SystemTable.OptionValueWrapper.Status;
-
import java.util.Iterator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper;
+
/**
* An enumeration of all tables in Drill's system ("sys") schema.
* <p>
@@ -41,40 +35,14 @@ public enum SystemTable {
OPTION("options", false, OptionValueWrapper.class) {
@Override
public Iterator<Object> getIterator(final FragmentContext context) {
- final DrillConfigIterator configOptions = new DrillConfigIterator(context.getConfig());
- final OptionManager fragmentOptions = context.getOptions();
- final Iterator<OptionValue> mergedOptions = Iterators.concat(configOptions.iterator(), fragmentOptions.iterator());
- final Iterator<OptionValueWrapper> optionValues = new Iterator<OptionValueWrapper>() {
- @Override
- public boolean hasNext() {
- return mergedOptions.hasNext();
- }
-
- @Override
- public OptionValueWrapper next() {
- final OptionValue value = mergedOptions.next();
- final Status status;
- if (value.type == OptionType.BOOT) {
- status = Status.BOOT;
- } else {
- final OptionValue def = fragmentOptions.getSystemManager().getDefault(value.name);
- if (value.equals(def)) {
- status = Status.DEFAULT;
- } else {
- status = Status.CHANGED;
- }
- }
- return new OptionValueWrapper(value.name, value.kind, value.type, value.num_val, value.string_val,
- value.bool_val, value.float_val, status);
- }
-
- @Override
- public void remove() {
- }
- };
- @SuppressWarnings("unchecked")
- final Iterator<Object> iterator = (Iterator<Object>) (Object) optionValues;
- return iterator;
+ return new OptionIterator(context, OptionIterator.Mode.SYS_SESS);
+ }
+ },
+
+ BOOT("boot", false, OptionValueWrapper.class) {
+ @Override
+ public Iterator<Object> getIterator(final FragmentContext context) {
+ return new OptionIterator(context, OptionIterator.Mode.BOOT);
}
},
@@ -134,35 +102,5 @@ public enum SystemTable {
return pojoClass;
}
- /**
- * Wrapper class for OptionValue to add Status
- */
- public static class OptionValueWrapper {
-
- public static enum Status {
- BOOT, DEFAULT, CHANGED
- }
- public final String name;
- public final Kind kind;
- public final OptionType type;
- public final Status status;
- public final Long num_val;
- public final String string_val;
- public final Boolean bool_val;
- public final Double float_val;
-
- public OptionValueWrapper(final String name, final Kind kind, final OptionType type, final Long num_val,
- final String string_val, final Boolean bool_val, final Double float_val,
- final Status status) {
- this.name = name;
- this.kind = kind;
- this.type = type;
- this.num_val = num_val;
- this.string_val = string_val;
- this.bool_val = bool_val;
- this.float_val = float_val;
- this.status = status;
- }
- }
}
[13/17] drill git commit: When interrupt terminating RPC bus
waitForSendComplete, make sure to release buffers.
Posted by ja...@apache.org.
When interrupt terminating RPC bus waitForSendComplete, make sure to release buffers.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69582463
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69582463
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69582463
Branch: refs/heads/master
Commit: 69582463c12ecb4d9ed987ceca1e13d17b75d7e0
Parents: 62a73bc
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 21:56:14 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:17:59 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/drill/exec/rpc/RpcBus.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/69582463/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 812b2fd..9ca09a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -28,7 +28,6 @@ import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
@@ -96,21 +95,21 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) {
- if (!allowInEventLoop) {
- if (connection.inEventLoop()) {
- throw new IllegalStateException("You attempted to send while inside the rpc event thread. This isn't allowed because sending will block if the channel is backed up.");
- }
-
- if (!connection.blockOnNotWritable(listener)) {
- return;
- }
- }
+ Preconditions
+ .checkArgument(
+ allowInEventLoop || !connection.inEventLoop(),
+ "You attempted to send while inside the rpc event thread. This isn't allowed because sending will block if the channel is backed up.");
ByteBuf pBuffer = null;
boolean completed = false;
try {
+ if (!allowInEventLoop && !connection.blockOnNotWritable(listener)) {
+ // if we're in not in the event loop and we're interrupted while blocking, skip sending this message.
+ return;
+ }
+
assert !Arrays.asList(dataBodies).contains(null);
assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
[07/17] drill git commit: DRILL-3081: Populate connection name as
late as possible so RPC error messages are reported correctly.
Posted by ja...@apache.org.
DRILL-3081: Populate connection name as late as possible so RPC error messages are reported correctly.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4b0b3a67
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4b0b3a67
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4b0b3a67
Branch: refs/heads/master
Commit: 4b0b3a67ab5e2db2baf34250bdedb174fce648ad
Parents: f0b3671
Author: Parth Chandra <pa...@apache.org>
Authored: Thu May 14 12:27:40 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/rpc/BasicClient.java | 15 +++++++--
.../exec/rpc/BasicClientWithConnection.java | 1 +
.../org/apache/drill/exec/rpc/BasicServer.java | 16 +++++-----
.../apache/drill/exec/rpc/RemoteConnection.java | 8 +++--
.../java/org/apache/drill/exec/rpc/RpcBus.java | 32 +++++++++++++++-----
.../drill/exec/rpc/RpcExceptionHandler.java | 13 ++++----
.../drill/exec/rpc/control/ControlClient.java | 3 +-
.../drill/exec/rpc/control/ControlServer.java | 1 +
.../apache/drill/exec/rpc/data/DataClient.java | 1 +
.../apache/drill/exec/rpc/data/DataServer.java | 1 +
.../apache/drill/exec/rpc/user/UserServer.java | 1 +
11 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index a33b370..cf09be3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -33,6 +33,7 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
+import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -46,7 +47,7 @@ import com.google.protobuf.Parser;
public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
extends RpcBus<T, R> {
- final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
// The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
// example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
@@ -101,7 +102,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
pipe.addLast("message-handler", new InboundHandler(connection));
- pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+ pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
}
}); //
@@ -110,6 +111,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
// }
}
+ public R initRemoteConnection(SocketChannel channel){
+ local=channel.localAddress();
+ remote=channel.remoteAddress();
+ return null;
+ };
+
private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
/**
@@ -200,12 +207,14 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
// So there is no point propagating the interruption as failure immediately.
long remainingWaitTimeMills = 120000;
long startTime = System.currentTimeMillis();
-
// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
while(true) {
try {
future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
if (future.isSuccess()) {
+ SocketAddress remote = future.channel().remoteAddress();
+ SocketAddress local = future.channel().localAddress();
+ setAddresses(remote, local);
// send a handshake on the current thread. This is the only time we will send from within the event thread.
// We can do this because the connection will not be backed up.
send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index ab54fa1..c194b5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -51,6 +51,7 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
@Override
public ServerConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
return new ServerConnection(connectionName, channel, alloc);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 6a7bc65..5c04264 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -85,11 +85,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
if (rpcMapping.hasTimeout()) {
pipe.addLast(TIMEOUT_HANDLER,
- new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout()));
+ new LogggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
}
pipe.addLast("message-handler", new InboundHandler(connection));
- pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+ pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
connect = true;
// logger.debug("Server connection initialization completed.");
@@ -101,19 +101,19 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
// }
}
- private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+ private class LogggingReadTimeoutHandler<C extends RemoteConnection> extends ReadTimeoutHandler {
- private final String name;
+ private final C connection;
private final int timeoutSeconds;
- public LogggingReadTimeoutHandler(String name, int timeoutSeconds) {
+ public LogggingReadTimeoutHandler(C connection, int timeoutSeconds) {
super(timeoutSeconds);
- this.name = name;
+ this.connection = connection;
this.timeoutSeconds = timeoutSeconds;
}
@Override
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
- logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", name,
+ logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", connection.getName(),
timeoutSeconds);
super.readTimedOut(ctx);
}
@@ -178,6 +178,8 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
@Override
public C initRemoteConnection(SocketChannel channel) {
+ local = channel.localAddress();
+ remote = channel.remoteAddress();
return null;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 199569c..30abcc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -33,7 +33,8 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
private final Channel channel;
private final WriteManager writeManager;
- private final String name;
+ private String name;
+ private final String clientName;
public boolean inEventLoop(){
return channel.eventLoop().inEventLoop();
@@ -42,7 +43,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
public RemoteConnection(SocketChannel channel, String name) {
super();
this.channel = channel;
- this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name);
+ this.clientName = name;
this.writeManager = new WriteManager();
channel.pipeline().addLast(new BackPressureHandler());
channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@@ -57,6 +58,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
}
public String getName() {
+ if(name == null){
+ name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
+ }
return name;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 1a23724..812b2fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -29,6 +29,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -67,10 +68,19 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
protected final RpcConfig rpcConfig;
+ protected volatile SocketAddress local;
+ protected volatile SocketAddress remote;
+
+
public RpcBus(RpcConfig rpcConfig) {
this.rpcConfig = rpcConfig;
}
+ protected void setAddresses(SocketAddress remote, SocketAddress local){
+ this.remote = remote;
+ this.local = local;
+ }
+
<SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
@@ -133,21 +143,27 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
- final InetSocketAddress local;
- final InetSocketAddress remote;
final C clientConnection;
+ private final Channel channel;
- public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) {
- this.local = local;
- this.remote = remote;
+ public ChannelClosedHandler(C clientConnection, Channel channel) {
+ this.channel = channel;
this.clientConnection = clientConnection;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- String msg = String.format("Channel closed %s <--> %s.", local, remote);
+ String msg;
+ if(local!=null) {
+ msg = String.format("Channel closed %s <--> %s.", local, remote);
+ }else{
+ msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress());
+ }
+
if (RpcBus.this.isClient()) {
- logger.info(String.format(msg));
+ if(local != null) {
+ logger.info(String.format(msg));
+ }
} else {
queue.channelClosed(new ChannelClosedException(msg));
}
@@ -158,7 +174,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
- return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress());
+ return new ChannelClosedHandler(clientConnection, channel);
}
private class ResponseSenderImpl implements ResponseSender {
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index c12ff7b..46b7702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -19,23 +19,24 @@ package org.apache.drill.exec.rpc;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import org.eclipse.jetty.io.Connection;
-public class RpcExceptionHandler implements ChannelHandler{
+public class RpcExceptionHandler<C extends RemoteConnection> implements ChannelHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
- private final String name;
+ private final C connection;
- public RpcExceptionHandler(String name) {
- this.name = name;
+ public RpcExceptionHandler(C connection){
+ this.connection = connection;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
- logger.warn("Exception occurred with closed channel. Connection: {}", name, cause);
+ logger.warn("Exception occurred with closed channel. Connection: {}", connection.getName(), cause);
return;
}else{
- logger.error("Exception in RPC communication. Connection: {}. Closing connection.", name, cause);
+ logger.error("Exception in RPC communication. Connection: {}. Closing connection.", connection.getName(), cause);
ctx.close();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index f191271..159f1df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -40,7 +40,7 @@ import com.google.protobuf.MessageLite;
public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
private final ControlMessageHandler handler;
private final DrillbitEndpoint remoteEndpoint;
@@ -66,6 +66,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
@SuppressWarnings("unchecked")
@Override
public ControlConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
this.connection = new ControlConnection("control client", channel,
(RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
return connection;
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 5e405ab..98ce9e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -69,6 +69,7 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
@Override
public ControlConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
return new ControlConnection("control server", channel, this, allocator);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index b8a07c7..544bab9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -56,6 +56,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
@Override
public DataClientConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
this.connection = new DataClientConnection(channel, this);
return connection;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 061ddcb..80d2d6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -77,6 +77,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
@Override
public BitServerConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
return new BitServerConnection(channel, context.getAllocator());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 72b07ba..a197356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -175,6 +175,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
@Override
public UserClientConnection initRemoteConnection(SocketChannel channel) {
+ super.initRemoteConnection(channel);
return new UserClientConnection(channel);
}
[15/17] drill git commit: DRILL-3098: Set Unix style "line.separator"
for tests
Posted by ja...@apache.org.
DRILL-3098: Set Unix style "line.separator" for tests
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/984ee012
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/984ee012
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/984ee012
Branch: refs/heads/master
Commit: 984ee012ae0e0a42efa1b6512d7c14dd7f287d1a
Parents: 7c78244
Author: Aditya Kishore <ad...@apache.org>
Authored: Thu May 14 21:51:18 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:18:03 2015 -0700
----------------------------------------------------------------------
common/src/test/java/org/apache/drill/test/DrillTest.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/984ee012/common/src/test/java/org/apache/drill/test/DrillTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java
index 8abcb6a..bbe014f 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -40,7 +40,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class DrillTest {
static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTest.class);
- protected static final ObjectMapper objectMapper = new ObjectMapper();
+ protected static final ObjectMapper objectMapper;
+ static {
+ System.setProperty("line.separator", "\n");
+ objectMapper = new ObjectMapper();
+ }
static final SystemManager manager = new SystemManager();
[17/17] drill git commit: DRILL-3100:
TestImpersonationDisabledWithMiniDFS fails on Windows
Posted by ja...@apache.org.
DRILL-3100: TestImpersonationDisabledWithMiniDFS fails on Windows
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d8b19759
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d8b19759
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d8b19759
Branch: refs/heads/master
Commit: d8b19759657698581cc0d01d7038797952888123
Parents: 36ff259
Author: Aditya Kishore <ad...@apache.org>
Authored: Thu May 14 22:00:34 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:18:03 2015 -0700
----------------------------------------------------------------------
.../impersonation/TestImpersonationDisabledWithMiniDFS.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/d8b19759/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
index e38d6da..6c3b96f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
@@ -46,6 +46,7 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
miniDfsPluginConfig.connection = conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
Map<String, WorkspaceConfig> workspaces = Maps.newHashMap(lfsPluginConfig.workspaces);
+ createAndAddWorkspace(dfsCluster.getFileSystem(), "dfstemp", "/tmp", (short)0777, processUser, processUser, workspaces);
miniDfsPluginConfig.workspaces = workspaces;
miniDfsPluginConfig.formats = ImmutableMap.copyOf(lfsPluginConfig.formats);
@@ -54,13 +55,13 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
// Create test table in minidfs.tmp schema for use in test queries
- test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`", MINIDFS_STORAGE_PLUGIN_NAME));
+ test(String.format("CREATE TABLE %s.dfstemp.dfsRegion AS SELECT * FROM cp.`region.json`", MINIDFS_STORAGE_PLUGIN_NAME));
}
@Test // DRILL-3037
public void testSimpleQuery() throws Exception {
final String query =
- String.format("SELECT sales_city, sales_country FROM tmp.dfsRegion ORDER BY region_id DESC LIMIT 2");
+ String.format("SELECT sales_city, sales_country FROM dfstemp.dfsRegion ORDER BY region_id DESC LIMIT 2");
testBuilder()
.optionSettingQueriesForTestQuery(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME))
[10/17] drill git commit: Update configuration defaults. Rename
bounds system property to drill.enable_unsafe_memory_access.
Posted by ja...@apache.org.
Update configuration defaults. Rename bounds system property to drill.enable_unsafe_memory_access.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f0b3671d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f0b3671d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f0b3671d
Branch: refs/heads/master
Commit: f0b3671d60d92800411d81ad283430d0eef01c96
Parents: 22d6465
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 18:09:11 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/drill/exec/util/AssertionUtil.java | 2 +-
exec/java-exec/src/main/resources/drill-module.conf | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f0b3671d/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
index 20c2f8e..2348e62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
@@ -28,7 +28,7 @@ public class AssertionUtil {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
- BOUNDS_CHECKING_ENABLED = ASSERT_ENABLED || !"false".equals(System.getProperty("bounds"));
+ BOUNDS_CHECKING_ENABLED = ASSERT_ENABLED || !"true".equals(System.getProperty("drill.enable_unsafe_memory_access"));
}
public static boolean isAssertionsEnabled(){
http://git-wip-us.apache.org/repos/asf/drill/blob/f0b3671d/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6fb9340..66055f1 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -37,7 +37,7 @@ drill.exec: {
}
},
bit: {
- timeout: 30,
+ timeout: 300,
server: {
port : 31011,
retry:{
[11/17] drill git commit: DRILL-3052,
DRILL-3066: Improve fragment state management in face of early
cancellation.
Posted by ja...@apache.org.
DRILL-3052, DRILL-3066: Improve fragment state management in face of early cancellation.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/aaf9fb83
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/aaf9fb83
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/aaf9fb83
Branch: refs/heads/master
Commit: aaf9fb834e02b9a3483758dd6eb475cc781db866
Parents: 4b0b3a6
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 20:07:29 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:14:56 2015 -0700
----------------------------------------------------------------------
.../exec/work/fragment/FragmentExecutor.java | 106 ++++++++++---------
1 file changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/aaf9fb83/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index ffb76b1..e5e0700 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work.fragment;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.drill.common.DeferredException;
@@ -53,6 +54,7 @@ public class FragmentExecutor implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class);
+ private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false);
private final String fragmentName;
private final FragmentContext fragmentContext;
private final StatusReporter listener;
@@ -139,25 +141,10 @@ public class FragmentExecutor implements Runnable {
* so we need to be careful about the state transitions that can result.
*/
public void cancel() {
- /*
- * When cancel() is called before run(), root is not initialized and the executor is not
- * ready to accept external events. So do not wait to change the state.
- *
- * For example, consider the case when the Foreman sets up the root fragment executor which is
- * waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The
- * run() method on the root executor will never be called, and the executor will never be ready
- * to accept external events. This would make the cancelling thread wait forever, if it was waiting on
- * acceptExternalEvents.
- */
- synchronized (this) {
- if (root != null) {
- acceptExternalEvents.awaitUninterruptibly();
- } else {
- // This fragment may or may not start running. If it doesn't then closeOutResources() will never be called.
- // Assuming it's safe to call closeOutResources() multiple times, we call it here explicitly in case this
- // fragment will never start running.
- closeOutResources();
- }
+ final boolean thisIsOnlyThread = this.hasCloseoutThread.compareAndSet(false, true);
+
+ if (!thisIsOnlyThread) {
+ acceptExternalEvents.awaitUninterruptibly();
/*
* We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
@@ -165,14 +152,33 @@ public class FragmentExecutor implements Runnable {
updateState(FragmentState.CANCELLATION_REQUESTED);
/*
- * Interrupt the thread so that it exits from any blocking operation it could be executing currently.
+ * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
+ * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
+ * procedure of the main thread.
*/
- final Thread myThread = myThreadRef.get();
- if (myThread != null) {
- logger.debug("Interrupting fragment thread {}", myThread.getName());
- myThread.interrupt();
+ synchronized (myThreadRef) {
+ final Thread myThread = myThreadRef.get();
+ if (myThread != null) {
+ logger.debug("Interrupting fragment thread {}", myThread.getName());
+ myThread.interrupt();
+ }
}
+ } else {
+ updateState(FragmentState.CANCELLATION_REQUESTED);
+ cleanup(FragmentState.FINISHED);
}
+
+ }
+
+ private void cleanup(FragmentState state) {
+
+ closeOutResources();
+
+ updateState(state);
+ // send the final state of the fragment. only the main execution thread can send the final state and it can
+ // only be sent once.
+ sendFinalState();
+
}
/**
@@ -203,6 +209,11 @@ public class FragmentExecutor implements Runnable {
@Override
public void run() {
+ // if a cancel thread has already entered this executor, we have not reason to continue.
+ if (!hasCloseoutThread.compareAndSet(false, true)) {
+ return;
+ }
+
final Thread myThread = Thread.currentThread();
myThreadRef.set(myThread);
final String originalThreadName = myThread.getName();
@@ -216,31 +227,24 @@ public class FragmentExecutor implements Runnable {
myThread.setName(newThreadName);
- synchronized (this) {
- /*
- * fragmentState might have changed even before this method is called e.g. cancel()
- */
- if (shouldContinue()) {
- // if we didn't get the root operator when the executor was created, create it now.
- final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
- drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+ // if we didn't get the root operator when the executor was created, create it now.
+ final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
+ drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
root = ImplCreator.getExec(fragmentContext, rootOperator);
if (root == null) {
return;
}
- clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
- updateState(FragmentState.RUNNING);
+ clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+ updateState(FragmentState.RUNNING);
- acceptExternalEvents.countDown();
+ acceptExternalEvents.countDown();
- final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
- logger.debug("Starting fragment {}:{} on {}:{}",
- fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
- endpoint.getAddress(), endpoint.getUserPort());
- }
- }
+ final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+ logger.debug("Starting fragment {}:{} on {}:{}",
+ fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
+ endpoint.getAddress(), endpoint.getUserPort());
final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ?
ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
@@ -275,21 +279,23 @@ public class FragmentExecutor implements Runnable {
fail(e);
} finally {
+ // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an
+ // interruption after we have moved beyond this block.
+ synchronized (myThreadRef) {
+ myThreadRef.set(null);
+ Thread.interrupted();
+ }
+
// We need to sure we countDown at least once. We'll do it here to guarantee that.
acceptExternalEvents.countDown();
- closeOutResources();
-
- updateState(FragmentState.FINISHED);
- // send the final state of the fragment. only the main execution thread can send the final state and it can
- // only be sent once.
- sendFinalState();
+ // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
+ cleanup(FragmentState.FINISHED);
clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
myThread.setName(originalThreadName);
- myThreadRef.set(null);
}
}
@@ -404,10 +410,10 @@ public class FragmentExecutor implements Runnable {
errorStateChange(current, target);
}
- // these should never be requested.
+ // these should never be requested.
+ case CANCELLED:
case SENDING:
case AWAITING_ALLOCATION:
- case CANCELLED:
default:
errorStateChange(current, target);
}
[16/17] drill git commit: DRILL-3099: FileSelection's selectionRoot
does not include the scheme and authority
Posted by ja...@apache.org.
DRILL-3099: FileSelection's selectionRoot does not include the scheme and authority
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36ff2590
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36ff2590
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36ff2590
Branch: refs/heads/master
Commit: 36ff259078f3fa9ebc2fb552eaedc8ce14298636
Parents: 984ee01
Author: Aditya Kishore <ad...@apache.org>
Authored: Thu May 14 21:56:13 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:18:03 2015 -0700
----------------------------------------------------------------------
.../drill/exec/store/dfs/FileSelection.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/36ff2590/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index be9784e..e7f7b28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.dfs;
import java.io.IOException;
+import java.net.URI;
import java.util.Collections;
import java.util.List;
@@ -127,29 +128,30 @@ public class FileSelection {
return statuses;
}
- public static String commonPath(FileStatus... paths){
+ private static String commonPath(FileStatus... paths) {
String commonPath = "";
String[][] folders = new String[paths.length][];
- for(int i = 0; i < paths.length; i++){
+ for (int i = 0; i < paths.length; i++) {
folders[i] = Path.getPathWithoutSchemeAndAuthority(paths[i].getPath()).toString().split("/");
}
- for(int j = 0; j < folders[0].length; j++){
+ for (int j = 0; j < folders[0].length; j++) {
String thisFolder = folders[0][j];
boolean allMatched = true;
- for(int i = 1; i < folders.length && allMatched; i++){
- if(folders[i].length < j){
+ for (int i = 1; i < folders.length && allMatched; i++) {
+ if (folders[i].length < j) {
allMatched = false;
break;
}
allMatched &= folders[i][j].equals(thisFolder);
}
- if(allMatched){
+ if (allMatched) {
commonPath += thisFolder + "/";
- }else{
+ } else {
break;
}
}
- return commonPath;
+ URI oneURI = paths[0].getPath().toUri();
+ return new Path(oneURI.getScheme(), oneURI.getAuthority(), commonPath).toString();
}
public static FileSelection create(DrillFileSystem fs, String parent, String path) throws IOException {