You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:47 UTC

[01/17] drill git commit: Add measurement of time taken to start a thread in timed runnable

Repository: drill
Updated Branches:
  refs/heads/master 5c4a9b212 -> d8b197596


Add measurement of time taken to start a thread in timed runnable


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c5f1c83f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c5f1c83f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c5f1c83f

Branch: refs/heads/master
Commit: c5f1c83f26e65ab71955898e875fdf2c3f681953
Parents: 5c4a9b2
Author: Parth Chandra <pa...@apache.org>
Authored: Wed May 13 15:42:06 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Thu May 14 19:53:32 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/store/TimedRunnable.java   | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c5f1c83f/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index 240aaef..5a35aff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -44,12 +44,14 @@ public abstract class TimedRunnable<V> implements Runnable {
   private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
 
   private volatile Exception e;
+  private volatile long threadStart;
   private volatile long timeNanos;
   private volatile V value;
 
   @Override
   public final void run() {
     long start = System.nanoTime();
+    threadStart=start;
     try{
       value = runInner();
     }catch(Exception e){
@@ -62,6 +64,9 @@ public abstract class TimedRunnable<V> implements Runnable {
   protected abstract V runInner() throws Exception ;
   protected abstract IOException convertToIOException(Exception e);
 
+  public long getThreadStart(){
+    return threadStart;
+  }
   public long getTimeSpentNanos(){
     return timeNanos;
   }
@@ -111,7 +116,7 @@ public abstract class TimedRunnable<V> implements Runnable {
    */
   public static <V> List<V> run(final String activity, final Logger logger, final List<TimedRunnable<V>> runnables, int parallelism) throws IOException {
     Stopwatch watch = new Stopwatch().start();
-
+    long timedRunnableStart=System.nanoTime();
     if(runnables.size() == 1){
       parallelism = 1;
       runnables.get(0).run();
@@ -158,6 +163,10 @@ public abstract class TimedRunnable<V> implements Runnable {
     long sum = 0;
     long max = 0;
     long count = 0;
+    // measure thread creation times
+    long earliestStart=Long.MAX_VALUE;
+    long latestStart=0;
+    long totalStart=0;
     IOException excep = null;
     for(final TimedRunnable<V> reader : runnables){
       try{
@@ -165,6 +174,9 @@ public abstract class TimedRunnable<V> implements Runnable {
         sum += reader.getTimeSpentNanos();
         count++;
         max = Math.max(max, reader.getTimeSpentNanos());
+        earliestStart=Math.min(earliestStart, reader.getThreadStart() - timedRunnableStart);
+        latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart);
+        totalStart+=latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart);
       }catch(IOException e){
         if(excep == null){
           excep = e;
@@ -176,11 +188,16 @@ public abstract class TimedRunnable<V> implements Runnable {
 
     if(logger.isInfoEnabled()){
       double avg = (sum/1000.0/1000.0)/(count*1.0d);
+      double avgStart = (totalStart/1000.0)/(count*1.0d);
 
       logger.info(
           String.format("%s: Executed %d out of %d using %d threads. "
               + "Time: %dms total, %fms avg, %dms max.",
               activity, count, runnables.size(), parallelism, watch.elapsed(TimeUnit.MILLISECONDS), avg, max/1000/1000));
+      logger.info(
+              String.format("%s: Executed %d out of %d using %d threads. "
+                              + "Earliest start: %f \u03BCs, Latest start: %f \u03BCs, Average start: %f \u03BCs .",
+                      activity, count, runnables.size(), parallelism, earliestStart/1000.0, latestStart/1000.0, avgStart));
     }
 
     if(excep != null) {


[04/17] drill git commit: DRILL-3072: Update root fragment to not to modify the Foreman state directly.

Posted by ja...@apache.org.
DRILL-3072: Update root fragment to not to modify the Foreman state directly.

Instead use RPC mechanism to send and receive status updates


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4dcb3e75
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4dcb3e75
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4dcb3e75

Branch: refs/heads/master
Commit: 4dcb3e75643b71daa7f458e1824ac7eb7fc10cde
Parents: e58a306
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 13 19:42:24 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/work/foreman/Foreman.java |  9 ++--
 .../drill/exec/work/foreman/QueryManager.java   | 47 +++++---------------
 2 files changed, 17 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 6840cf3..5d07b49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -205,7 +205,7 @@ public class Foreman implements Runnable {
     // resume all pauses through query context
     queryContext.getExecutionControls().unpauseAll();
     // resume all pauses through all fragment contexts
-    queryManager.unpauseExecutingFragments(drillbitContext, rootRunner);
+    queryManager.unpauseExecutingFragments(drillbitContext);
   }
 
   /**
@@ -810,7 +810,7 @@ public class Foreman implements Runnable {
           assert exception == null;
           queryManager.markEndTime();
           recordNewState(QueryState.CANCELLATION_REQUESTED);
-          queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+          queryManager.cancelExecutingFragments(drillbitContext);
           foremanResult.setCompleted(QueryState.CANCELED);
           /*
            * We don't close the foremanResult until we've gotten
@@ -833,7 +833,7 @@ public class Foreman implements Runnable {
           assert exception != null;
           queryManager.markEndTime();
           recordNewState(QueryState.FAILED);
-          queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+          queryManager.cancelExecutingFragments(drillbitContext);
           foremanResult.setFailed(exception);
           foremanResult.close();
           return;
@@ -934,7 +934,8 @@ public class Foreman implements Runnable {
 
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
-    rootRunner = new FragmentExecutor(rootContext, rootFragment, queryManager.newRootStatusHandler(rootContext),
+    rootRunner = new FragmentExecutor(rootContext, rootFragment,
+        queryManager.newRootStatusHandler(rootContext, drillbitContext),
         rootOperator);
     final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index eed4e17..71b77c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.sys.PStore;
@@ -53,6 +54,7 @@ import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 import org.apache.drill.exec.work.fragment.StatusReporter;
 
 import com.carrotsearch.hppc.IntObjectOpenHashMap;
@@ -176,17 +178,15 @@ public class QueryManager {
 
   /**
    * Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
-   * (1) Root fragment
-   *    (a) If the root is pending, delegate the cancellation to local work bus.
-   *    (b) If the root is running, cancel the fragment directly.
    *
    * For the actual cancel calls for intermediate and leaf fragments, see
    * {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
+   * (1) Root fragment: pending or running, send the cancel signal through a tunnel.
    * (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
    *    fragments). The actual cancel is done by delegating the cancel to the work bus.
    * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
    */
-  void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+  void cancelExecutingFragments(final DrillbitContext drillbitContext) {
     final Controller controller = drillbitContext.getController();
     for(final FragmentData data : fragmentDataSet) {
       switch(data.getState()) {
@@ -194,19 +194,10 @@ public class QueryManager {
       case AWAITING_ALLOCATION:
       case RUNNING:
         final FragmentHandle handle = data.getHandle();
-        if (rootRunner.getContext().getHandle().equals(handle)) {
-          // Case 1.a: pending root is in the work bus. Delegate the cancel to the work bus.
-          final boolean removed = drillbitContext.getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
-          // Case 1.b: running root. Cancel directly.
-          if (!removed) {
-            rootRunner.cancel();
-          }
-        } else {
-          final DrillbitEndpoint endpoint = data.getEndpoint();
-          // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
-          controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
+        final DrillbitEndpoint endpoint = data.getEndpoint();
+        // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
+        controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
             SignalListener.Signal.CANCEL), handle);
-        }
         break;
 
       case FINISHED:
@@ -221,13 +212,9 @@ public class QueryManager {
 
   /**
    * Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
-   * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the
-   * control tunnel.
+   * sending any message. Resume all fragments through the control tunnel.
    */
-  void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
-    if (rootRunner != null) {
-      rootRunner.unpause();
-    }
+  void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
     final Controller controller = drillbitContext.getController();
     for(final FragmentData data : fragmentDataSet) {
       final DrillbitEndpoint endpoint = data.getEndpoint();
@@ -447,19 +434,9 @@ public class QueryManager {
     }
   }
 
-  public StatusReporter newRootStatusHandler(final FragmentContext context) {
-    return new RootStatusReporter(context);
-  }
-
-  private class RootStatusReporter extends AbstractStatusReporter {
-    private RootStatusReporter(final FragmentContext context) {
-      super(context);
-    }
-
-    @Override
-    protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
-      fragmentStatusListener.statusUpdate(status);
-    }
+  public StatusReporter newRootStatusHandler(final FragmentContext context, final DrillbitContext dContext) {
+    final ControlTunnel tunnel = dContext.getController().getTunnel(foreman.getQueryContext().getCurrentEndpoint());
+    return new NonRootStatusReporter(context, tunnel);
   }
 
   public FragmentStatusListener getFragmentStatusListener(){


[14/17] drill git commit: DRILL-3089: Revert to 2 forked test and allow override from command line

Posted by ja...@apache.org.
DRILL-3089: Revert to 2 forked test and allow override from command line


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7c782444
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7c782444
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7c782444

Branch: refs/heads/master
Commit: 7c7824443d8a6621bfe76f384c0598b511fb25b8
Parents: 6958246
Author: Aditya Kishore <ad...@apache.org>
Authored: Thu May 14 16:11:08 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:18:02 2015 -0700

----------------------------------------------------------------------
 pom.xml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7c782444/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 678d201..e4b7bb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,6 +33,7 @@
     <proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
     <dep.junit.version>4.11</dep.junit.version>
     <dep.slf4j.version>1.7.6</dep.slf4j.version>
+    <forkCount>2</forkCount>
     <parquet.version>1.6.0rc3-drill-r0.3</parquet.version>
   </properties>
 
@@ -370,7 +371,7 @@
               -Dorg.apache.drill.exec.server.Drillbit.system_options="org.apache.drill.exec.compile.ClassTransformer.scalar_replacement=on"
               -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=3072M
               -XX:+CMSClassUnloadingEnabled -ea</argLine>
-            <forkCount>1C</forkCount>
+            <forkCount>${forkCount}</forkCount>
             <reuseForks>true</reuseForks>
             <additionalClasspathElements>
               <additionalClasspathElement>./exec/jdbc/src/test/resources/storage-plugins.json</additionalClasspathElement>


[12/17] drill git commit: Fix PartitionSenderRootExec possible memory leak.

Posted by ja...@apache.org.
Fix PartitionSenderRootExec possible memory leak.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/62a73bcd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/62a73bcd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/62a73bcd

Branch: refs/heads/master
Commit: 62a73bcd82464b0a48a234e09040ed069033b848
Parents: aaf9fb8
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 21:55:44 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:14:59 2015 -0700

----------------------------------------------------------------------
 .../PartitionSenderRootExec.java                | 41 +++++++++++++-------
 1 file changed, 26 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/62a73bcd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 1872a51..31fc160 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -230,25 +230,36 @@ public class PartitionSenderRootExec extends BaseRootExec {
     final List<Partitioner> subPartitioners = createClassInstances(actualPartitions);
     int startIndex = 0;
     int endIndex = 0;
-    for (int i = 0; i < actualPartitions; i++) {
-      startIndex = endIndex;
-      endIndex = (i < actualPartitions - 1 ) ? startIndex + divisor : outGoingBatchCount;
-      if ( i < longTail ) {
-        endIndex++;
+
+    boolean success = false;
+    try {
+      for (int i = 0; i < actualPartitions; i++) {
+        startIndex = endIndex;
+        endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
+        if (i < longTail) {
+          endIndex++;
+        }
+        final OperatorStats partitionStats = new OperatorStats(stats, true);
+        subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
+            startIndex, endIndex);
+      }
+
+      synchronized (this) {
+        partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+        for (int index = 0; index < terminations.size(); index++) {
+          partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+        }
+        terminations.clear();
       }
-      final OperatorStats partitionStats = new OperatorStats(stats, true);
-      subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
-        startIndex, endIndex);
-    }
 
-    synchronized(this){
-      partitioner = new PartitionerDecorator(subPartitioners, stats, context);
-      for (int index = 0; index < terminations.size(); index++) {
-        partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+      success = true;
+    } finally {
+      if (!success) {
+        for (Partitioner p : subPartitioners) {
+          p.clear();
+        }
       }
-      terminations.clear();
     }
-
   }
 
   private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {


[05/17] drill git commit: DRILL-3063: TestQueriesOnLargeFile leaks memory with 16M limit Changed the cleanup handling at the end of ImplCreator.getExec(), and handle the newly returned null value in FragmentExecutor.run().

Posted by ja...@apache.org.
DRILL-3063: TestQueriesOnLargeFile leaks memory with 16M limit
Changed the cleanup handling at the end of ImplCreator.getExec(), and
handle the newly returned null value in FragmentExecutor.run().


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e58a3063
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e58a3063
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e58a3063

Branch: refs/heads/master
Commit: e58a30638fd7fc700b1e93d05503ddb7a7b76643
Parents: bef60f5
Author: Chris Westin <cw...@yahoo.com>
Authored: Thu May 14 15:53:05 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/physical/impl/ImplCreator.java   | 13 ++++++-------
 .../drill/exec/work/fragment/FragmentExecutor.java     |  3 +++
 2 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e58a3063/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 77ca0f5..66558be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -77,7 +77,6 @@ public class ImplCreator {
     Stopwatch watch = new Stopwatch();
     watch.start();
 
-    boolean success = false;
     try {
       final RootExec rootExec = creator.getRootExec(root, context);
       // skip over this for SimpleRootExec (testing)
@@ -91,15 +90,15 @@ public class ImplCreator {
             "The provided fragment did not have a root node that correctly created a RootExec value.");
       }
 
-      success = true;
       return rootExec;
-    } finally {
-      if (!success) {
-        for(final CloseableRecordBatch crb : creator.getOperators()) {
-          AutoCloseables.close(crb, logger);
-        }
+    } catch(Exception e) {
+      context.fail(e);
+      for(final CloseableRecordBatch crb : creator.getOperators()) {
+        AutoCloseables.close(crb, logger);
       }
     }
+
+    return null;
   }
 
   /** Create RootExec and its children (RecordBatches) for given FragmentRoot */

http://git-wip-us.apache.org/repos/asf/drill/blob/e58a3063/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 8c49d68..ffb76b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -226,6 +226,9 @@ public class FragmentExecutor implements Runnable {
               drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
 
           root = ImplCreator.getExec(fragmentContext, rootOperator);
+          if (root == null) {
+            return;
+          }
 
           clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
           updateState(FragmentState.RUNNING);


[06/17] drill git commit: Update Drill to use latest sqlline

Posted by ja...@apache.org.
Update Drill to use latest sqlline


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3c879974
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3c879974
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3c879974

Branch: refs/heads/master
Commit: 3c879974837ce78c5041f3492519e9ea94fc4a5a
Parents: eca6cd7
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 09:16:41 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700

----------------------------------------------------------------------
 distribution/src/resources/sqlline | 4 +++-
 pom.xml                            | 4 ++--
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3c879974/distribution/src/resources/sqlline
----------------------------------------------------------------------
diff --git a/distribution/src/resources/sqlline b/distribution/src/resources/sqlline
index b1b9014..2c9c783 100644
--- a/distribution/src/resources/sqlline
+++ b/distribution/src/resources/sqlline
@@ -39,7 +39,9 @@ bin=`cd "$bin">/dev/null; pwd`
 # is.) 
 # Put our property specification before previous value of DRILL_SHELL_JAVA_OPTS
 # so that it can still be overridden via DRILL_SHELL_JAVA_OPTS.
-DRILL_SHELL_JAVA_OPTS="-Dsqlline.isolation=TRANSACTION_NONE $DRILL_SHELL_JAVA_OPTS"
+#
+# This is not currently needed as the new SQLLine we are using doesn't isolate.
+# DRILL_SHELL_JAVA_OPTS="-Dsqlline.isolation=TRANSACTION_NONE $DRILL_SHELL_JAVA_OPTS"
 
 DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/sqlline.log -Dlog.query.path=$DRILL_LOG_DIR/sqlline_queries.json"
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3c879974/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cf17a79..678d201 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1032,7 +1032,7 @@
           <dependency>
             <groupId>sqlline</groupId>
             <artifactId>sqlline</artifactId>
-            <version>1.1.6</version>
+            <version>1.1.9-drill-r3</version>
           </dependency>
 
           <dependency>
@@ -1370,7 +1370,7 @@
           <dependency>
             <groupId>sqlline</groupId>
             <artifactId>sqlline</artifactId>
-            <version>1.1.6</version>
+            <version>1.1.9-drill-r3</version>
           </dependency>
           <!-- Test Dependencies -->
           <dependency>


[02/17] drill git commit: DRILL-3061: Fix memory leaks in TestDrillbitResilience

Posted by ja...@apache.org.
DRILL-3061: Fix memory leaks in TestDrillbitResilience

- fixes a race condition in WorkEventBus
-  marking TestDrillbitResilience with @Ignore


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bef60f58
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bef60f58
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bef60f58

Branch: refs/heads/master
Commit: bef60f58a79b6bb41c06d59897577d7f7017c526
Parents: c5f1c83
Author: adeneche <ad...@gmail.com>
Authored: Tue May 12 10:53:32 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:43 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/rpc/control/WorkEventBus.java    |  16 +-
 .../apache/drill/exec/rpc/data/DataServer.java  |   2 +-
 .../org/apache/drill/exec/ZookeeperHelper.java  |  14 +-
 .../exec/server/TestDrillbitResilience.java     | 190 ++++++++++++++++---
 4 files changed, 190 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index ddd7828..3e461ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -90,16 +90,16 @@ public class WorkEventBus {
   }
 
   public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
-    // Check if this was a recently finished (completed or cancelled) fragment.  If so, throw away message.
-    if (recentlyFinishedFragments.asMap().containsKey(handle)) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+    synchronized (this) {
+      // Check if this was a recently finished (completed or cancelled) fragment.  If so, throw away message.
+      if (recentlyFinishedFragments.asMap().containsKey(handle)) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+        }
+        return null;
       }
-      return null;
-    }
 
-    // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
-    synchronized (this) {
+      // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
       final FragmentManager m = managers.get(handle);
       if (m != null) {
         return m;

http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 0d4077e..061ddcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -150,7 +150,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
       logger.error("Failure while getting fragment manager. {}",
           QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
               fragmentBatch.getReceivingMajorFragmentId(),
-              fragmentBatch.getReceivingMinorFragmentIdList()));
+              fragmentBatch.getReceivingMinorFragmentIdList()), e);
       ack.clear();
       sender.send(new Response(RpcType.ACK, Acks.FAIL));
     } finally {

http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
index a5db81d..630c81b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -46,10 +46,22 @@ public class ZookeeperHelper {
    * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
    */
   public ZookeeperHelper() {
+    this(false);
+  }
+
+  /**
+   * Constructor.
+   *
+   * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
+   * @param failureInCancelled pass true if you want failures in cancelled fragments to be reported as failures
+   */
+  public ZookeeperHelper(boolean failureInCancelled) {
     final Properties overrideProps = new Properties();
     // Forced to disable this, because currently we leak memory which is a known issue for query cancellations.
     // Setting this causes unittests to fail.
-    // overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+    if (failureInCancelled) {
+      overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+    }
     config = DrillConfig.create(overrideProps);
     zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 8552ec1..696aed8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -30,8 +30,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.math3.util.Pair;
+import org.apache.drill.BaseTestQuery;
 import org.apache.drill.QueryTestUtil;
 import org.apache.drill.SingleRowListener;
 import org.apache.drill.common.AutoCloseables;
@@ -48,11 +48,9 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.physical.impl.ScreenCreator;
 import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
-import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
-import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
 import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
 import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
@@ -88,11 +86,14 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Test how resilient drillbits are to throwing exceptions during various phases of query
  * execution by injecting exceptions at various points and to cancellations in various phases.
  * The test cases are mentioned in DRILL-2383.
  */
+@Ignore
 public class TestDrillbitResilience extends DrillTest {
   private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
 
@@ -164,7 +165,7 @@ public class TestDrillbitResilience extends DrillTest {
     // turn off the HTTP server to avoid port conflicts between the drill bits
     System.setProperty(ExecConstants.HTTP_ENABLE, "false");
 
-    zkHelper = new ZookeeperHelper();
+    zkHelper = new ZookeeperHelper(true);
     zkHelper.startZookeeper(1);
 
     // use a non-null service set so that the drillbits can use port hunting
@@ -269,7 +270,6 @@ public class TestDrillbitResilience extends DrillTest {
     assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty());
   }
 
-  @SuppressWarnings("static-method")
   @After
   public void checkDrillbits() {
     clearAllInjections(); // so that the drillbit check itself doesn't trigger anything
@@ -355,6 +355,8 @@ public class TestDrillbitResilience extends DrillTest {
 
   @Test
   public void settingNoopInjectionsAndQuery() {
+    final long before = countAllocatedMemory();
+
     final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
     setControls(controls);
     try {
@@ -362,6 +364,9 @@ public class TestDrillbitResilience extends DrillTest {
     } catch (final Exception e) {
       fail(e.getMessage());
     }
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   /**
@@ -381,16 +386,24 @@ public class TestDrillbitResilience extends DrillTest {
     }
   }
 
-  @SuppressWarnings("static-method")
   @Test
   public void foreman_runTryBeginning() {
+    final long before = countAllocatedMemory();
+
     testForeman("run-try-beginning");
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  @SuppressWarnings("static-method")
   @Test
   public void foreman_runTryEnd() {
+    final long before = countAllocatedMemory();
+
     testForeman("run-try-end");
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   /**
@@ -463,7 +476,7 @@ public class TestDrillbitResilience extends DrillTest {
       }
       result.release();
     }
-  };
+  }
 
   /**
    * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
@@ -537,7 +550,11 @@ public class TestDrillbitResilience extends DrillTest {
    * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
    */
   private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
-    assertCancelled(controls, TEST_QUERY, listener);
+    assertCancelledWithoutException(controls, listener, TEST_QUERY);
+  }
+
+  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener, final String query) {
+    assertCancelled(controls, query, listener);
   }
 
   /**
@@ -579,6 +596,9 @@ public class TestDrillbitResilience extends DrillTest {
 
   @Test // To test pause and resume. Test hangs if resume did not happen.
   public void passThrough() {
+    final long before = countAllocatedMemory();
+
+
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       @Override
       public void queryIdArrived(final QueryId queryId) {
@@ -595,11 +615,16 @@ public class TestDrillbitResilience extends DrillTest {
     QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
     assertCompleteState(result, QueryState.COMPLETED);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   @Test // Cancellation TC 1: cancel before any result set is returned
   @Ignore // DRILL-3052
   public void cancelBeforeAnyResultsArrive() {
+    final long before = countAllocatedMemory();
+
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
 
       @Override
@@ -611,10 +636,15 @@ public class TestDrillbitResilience extends DrillTest {
 
     final String controls = createPauseInjection(Foreman.class, "foreman-ready");
     assertCancelledWithoutException(controls, listener);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   @Test // Cancellation TC 2: cancel in the middle of fetching result set
   public void cancelInMiddleOfFetchingResults() {
+    final long before = countAllocatedMemory();
+
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private boolean cancelRequested = false;
 
@@ -632,11 +662,16 @@ public class TestDrillbitResilience extends DrillTest {
     // skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume
     final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
     assertCancelledWithoutException(controls, listener);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
 
   @Test // Cancellation TC 3: cancel after all result set are produced but not all are fetched
   public void cancelAfterAllResultsProduced() {
+    final long before = countAllocatedMemory();
+
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private int count = 0;
 
@@ -652,10 +687,16 @@ public class TestDrillbitResilience extends DrillTest {
 
     final String controls = createPauseInjection(ScreenCreator.class, "send-complete");
     assertCancelledWithoutException(controls, listener);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   @Test // Cancellation TC 4: cancel after everything is completed and fetched
+  @Ignore
   public void cancelAfterEverythingIsCompleted() {
+    final long before = countAllocatedMemory();
+
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private int count = 0;
 
@@ -671,16 +712,25 @@ public class TestDrillbitResilience extends DrillTest {
 
     final String controls = createPauseInjection(Foreman.class, "foreman-cleanup");
     assertCancelledWithoutException(controls, listener);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   @Test // Completion TC 1: success
   public void successfullyCompletes() {
+    final long before = countAllocatedMemory();
+
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
     QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
     assertCompleteState(result, QueryState.COMPLETED);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
+
   /**
    * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
    */
@@ -702,26 +752,41 @@ public class TestDrillbitResilience extends DrillTest {
 
   @Test // Completion TC 2: failed query - before query is executed - while sql parsing
   public void failsWhenParsing() {
+    final long before = countAllocatedMemory();
+
     final String exceptionDesc = "sql-parsing";
     final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
     final String controls = createSingleException(DrillSqlWorker.class, exceptionDesc, exceptionClass);
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   @Test // Completion TC 3: failed query - before query is executed - while sending fragments to other drillbits
   public void failsWhenSendingFragments() {
+    final long before = countAllocatedMemory();
+
     final String exceptionDesc = "send-fragments";
     final Class<? extends Throwable> exceptionClass = ForemanException.class;
     final String controls = createSingleException(Foreman.class, exceptionDesc, exceptionClass);
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   @Test // Completion TC 4: failed query - during query execution
   public void failsDuringExecution() {
+    final long before = countAllocatedMemory();
+
     final String exceptionDesc = "fragment-execution";
     final Class<? extends Throwable> exceptionClass = IOException.class;
     final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   /**
@@ -730,8 +795,13 @@ public class TestDrillbitResilience extends DrillTest {
    */
   @Test
   public void testInterruptingBlockedMergingRecordBatch() {
+    final long before = countAllocatedMemory();
+
     final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1);
     testInterruptingBlockedFragmentsWaitingForData(control);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   /**
@@ -740,8 +810,13 @@ public class TestDrillbitResilience extends DrillTest {
    */
   @Test
   public void testInterruptingBlockedUnorderedReceiverBatch() {
+    final long before = countAllocatedMemory();
+
     final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1);
     testInterruptingBlockedFragmentsWaitingForData(control);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   private static void testInterruptingBlockedFragmentsWaitingForData(final String control) {
@@ -769,22 +844,27 @@ public class TestDrillbitResilience extends DrillTest {
       setSessionOption(HASHAGG.getOptionName(), "true");
       setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
 
+      final long before = countAllocatedMemory();
+
       final String controls = "{\"injections\" : ["
-          + "{"
-          + "\"type\" : \"latch\","
-          + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
-          + "\"desc\" : \"partitioner-sender-latch\""
-          + "},"
-          + "{"
-          + "\"type\" : \"pause\","
-          + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
-          + "\"desc\" : \"wait-for-fragment-interrupt\","
-          + "\"nSkip\" : 1"
-          + "}" +
-          "]}";
+        + "{"
+        + "\"type\" : \"latch\","
+        + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+        + "\"desc\" : \"partitioner-sender-latch\""
+        + "},"
+        + "{"
+        + "\"type\" : \"pause\","
+        + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+        + "\"desc\" : \"wait-for-fragment-interrupt\","
+        + "\"nSkip\" : 1"
+        + "}" +
+        "]}";
 
       final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
       assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+
+      final long after = countAllocatedMemory();
+      assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
     } finally {
       setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
       setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
@@ -795,9 +875,75 @@ public class TestDrillbitResilience extends DrillTest {
 
   @Test
   public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception {
+
+    final long before = countAllocatedMemory();
+
     final String control =
-        createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
+      createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
     assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+  }
+
+  @Test
+  public void memoryLeaksWhenCancelled() {
+    setSessionOption(SLICE_TARGET, "10");
+
+    final long before = countAllocatedMemory();
+
+    final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
+    String query = null;
+    try {
+      query = BaseTestQuery.getFile("queries/tpch/09.sql");
+    } catch (final IOException e) {
+      fail("Failed to get query file: " + e);
+    }
+
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
+      private boolean cancelRequested = false;
+
+      @Override
+      public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+        if (!cancelRequested) {
+          check(queryId != null, "Query id should not be null, since we have waited long enough.");
+          cancelAndResume();
+          cancelRequested = true;
+        }
+        result.release();
+      }
+    };
+
+    assertCancelledWithoutException(controls, listener, query.substring(0, query.length() - 1));
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+
+    setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+  }
+
+  @Test
+  public void memoryLeaksWhenFailed() {
+    setSessionOption(SLICE_TARGET, "10");
+
+    final long before = countAllocatedMemory();
+
+    final String exceptionDesc = "fragment-execution";
+    final Class<? extends Throwable> exceptionClass = IOException.class;
+    final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
+    String query = null;
+    try {
+      query = BaseTestQuery.getFile("queries/tpch/09.sql");
+    } catch (final IOException e) {
+      fail("Failed to get query file: " + e);
+    }
+
+    assertFailsWithException(controls, exceptionClass, exceptionDesc, query.substring(0, query.length() - 1));
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+
+    setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
   }
 
   @Test // DRILL-3065


[09/17] drill git commit: In the case of extended RPC message handling, write warning to log

Posted by ja...@apache.org.
In the case of extended RPC message handling, write warning to log


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7fccf7e1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7fccf7e1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7fccf7e1

Branch: refs/heads/master
Commit: 7fccf7e1fd9f108edbd5bf177339173d900b89a3
Parents: 3c87997
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 12:19:46 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/drill/exec/rpc/RpcBus.java  | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7fccf7e1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 92ce312..1a23724 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -31,12 +31,14 @@ import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
@@ -203,8 +205,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
         logger.debug("Received message {}", msg);
       }
       final Channel channel = connection.getChannel();
+      final Stopwatch watch = new Stopwatch().start();
 
       try{
+
         switch (msg.mode) {
         case REQUEST: {
           // handle message and ack.
@@ -270,6 +274,13 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
           throw new UnsupportedOperationException();
         }
       } finally {
+        long time = watch.elapsed(TimeUnit.MILLISECONDS);
+        long delayThreshold = Integer.parseInt(System.getProperty("drill.exec.rpcDelayWarning", "500"));
+        if (time > delayThreshold) {
+          logger.warn(String.format(
+              "Message of mode %s of rpc type %d took longer than %dms.  Actual duration was %dms.",
+              msg.mode, msg.rpcType, delayThreshold, time));
+        }
         msg.release();
       }
     }


[03/17] drill git commit: Add convenience drill-* executables for sqlline. Rename drill_dumpcat to dumpcat

Posted by ja...@apache.org.
Add convenience drill-* executables for sqlline.  Rename drill_dumpcat to dumpcat


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/eca6cd71
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/eca6cd71
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/eca6cd71

Branch: refs/heads/master
Commit: eca6cd7136557273e4ff933f12dc3f24756a9cda
Parents: 4dcb3e7
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 10:13:35 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700

----------------------------------------------------------------------
 distribution/src/assemble/bin.xml          | 17 ++++++++++++++++-
 distribution/src/resources/drill-conf      | 22 ++++++++++++++++++++++
 distribution/src/resources/drill-embedded  | 22 ++++++++++++++++++++++
 distribution/src/resources/drill-localhost | 22 ++++++++++++++++++++++
 distribution/src/resources/drill_dumpcat   | 25 -------------------------
 distribution/src/resources/dumpcat         | 25 +++++++++++++++++++++++++
 6 files changed, 107 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 0576fd2..684ff19 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -245,6 +245,21 @@
       <outputDirectory>bin</outputDirectory>
     </file>
     <file>
+      <source>src/resources/drill-conf</source>
+      <fileMode>0755</fileMode>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/drill-embedded</source>
+      <fileMode>0755</fileMode>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/drill-localhost</source>
+      <fileMode>0755</fileMode>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
       <source>src/resources/drill-config.sh</source>
       <fileMode>0755</fileMode>
       <outputDirectory>bin</outputDirectory>
@@ -264,7 +279,7 @@
       <outputDirectory>bin</outputDirectory>
     </file>
     <file>
-      <source>src/resources/drill_dumpcat</source>
+      <source>src/resources/dumpcat</source>
       <fileMode>0755</fileMode>
       <outputDirectory>bin</outputDirectory>
     </file>

http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/drill-conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-conf b/distribution/src/resources/drill-conf
new file mode 100755
index 0000000..00e0d25
--- /dev/null
+++ b/distribution/src/resources/drill-conf
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+# Start sqlline session using connection settings from configuration file
+exec ${bin}/sqlline -u "jdbc:drill:" "$@"

http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/drill-embedded
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-embedded b/distribution/src/resources/drill-embedded
new file mode 100755
index 0000000..d66ba7b
--- /dev/null
+++ b/distribution/src/resources/drill-embedded
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+# Start a sqlline session with an embedded Drillbit
+exec ${bin}/sqlline -u "jdbc:drill:zk=local" "$@"

http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/drill-localhost
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-localhost b/distribution/src/resources/drill-localhost
new file mode 100755
index 0000000..454045c
--- /dev/null
+++ b/distribution/src/resources/drill-localhost
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+# Start sqlline session by connection to locally running Drillbit
+exec ${bin}/sqlline -u "jdbc:drill:drillbit=localhost" "$@"

http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/drill_dumpcat
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill_dumpcat b/distribution/src/resources/drill_dumpcat
deleted file mode 100755
index a2ea4d3..0000000
--- a/distribution/src/resources/drill_dumpcat
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "${BASH_SOURCE-$0}"`
-bin=`cd "$bin">/dev/null; pwd`
-
-. "$bin"/drill-config.sh
-
-DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/drill_dumpcat.log"
-
-exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.client.DumpCat $@

http://git-wip-us.apache.org/repos/asf/drill/blob/eca6cd71/distribution/src/resources/dumpcat
----------------------------------------------------------------------
diff --git a/distribution/src/resources/dumpcat b/distribution/src/resources/dumpcat
new file mode 100755
index 0000000..a2ea4d3
--- /dev/null
+++ b/distribution/src/resources/dumpcat
@@ -0,0 +1,25 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+. "$bin"/drill-config.sh
+
+DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/drill_dumpcat.log"
+
+exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.client.DumpCat $@


[08/17] drill git commit: Update sys.options table to only show options available via ALTER SYSTEM or ALTER SESSION. Move BOOT options to sys.boot table.

Posted by ja...@apache.org.
Update sys.options table to only show options available via ALTER SYSTEM or ALTER SESSION.  Move BOOT options to sys.boot table.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/22d6465e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/22d6465e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/22d6465e

Branch: refs/heads/master
Commit: 22d6465e0037b84bd5749a2b3654de1de21bb9de
Parents: 7fccf7e
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 13:41:26 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/server/options/OptionValue.java  |   7 +-
 .../drill/exec/store/sys/OptionIterator.java    | 126 +++++++++++++++++++
 .../drill/exec/store/sys/SystemTable.java       |  84 ++-----------
 3 files changed, 143 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/22d6465e/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
index 8735fd6..487553e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.google.common.base.Preconditions;
 
 @JsonInclude(Include.NON_NULL)
-public class OptionValue {
+public class OptionValue implements Comparable<OptionValue> {
 
   public static enum OptionType {
     BOOT, SYSTEM, SESSION, QUERY
@@ -179,6 +179,11 @@ public class OptionValue {
   }
 
   @Override
+  public int compareTo(OptionValue o) {
+    return this.name.compareTo(o.name);
+  }
+
+  @Override
   public String toString() {
     return "OptionValue [type=" + type + ", name=" + name + ", value=" + getValue() + "]";
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/22d6465e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
new file mode 100644
index 0000000..3e42436
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.server.options.DrillConfigIterator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+public class OptionIterator implements Iterator<Object> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OptionIterator.class);
+
+  enum Mode {
+    BOOT, SYS_SESS, BOTH
+  };
+
+  private final OptionManager fragmentOptions;
+  private final Iterator<OptionValue> mergedOptions;
+
+  public OptionIterator(FragmentContext context, Mode mode){
+    final DrillConfigIterator configOptions = new DrillConfigIterator(context.getConfig());
+    fragmentOptions = context.getOptions();
+    final Iterator<OptionValue> optionList;
+    switch(mode){
+    case BOOT:
+      optionList = configOptions.iterator();
+      break;
+    case SYS_SESS:
+      optionList = fragmentOptions.iterator();
+      break;
+    default:
+      optionList = Iterators.concat(configOptions.iterator(), fragmentOptions.iterator());
+    }
+
+    List<OptionValue> values = Lists.newArrayList(optionList);
+    Collections.sort(values);
+    mergedOptions = values.iterator();
+
+  }
+
+  @Override
+  public boolean hasNext() {
+    return mergedOptions.hasNext();
+  }
+
+  @Override
+  public OptionValueWrapper next() {
+    final OptionValue value = mergedOptions.next();
+    final Status status;
+    if (value.type == OptionType.BOOT) {
+      status = Status.BOOT;
+    } else {
+      final OptionValue def = fragmentOptions.getSystemManager().getDefault(value.name);
+      if (value.equals(def)) {
+        status = Status.DEFAULT;
+        } else {
+        status = Status.CHANGED;
+        }
+      }
+    return new OptionValueWrapper(value.name, value.kind, value.type, value.num_val, value.string_val,
+        value.bool_val, value.float_val, status);
+  }
+
+  public static enum Status {
+    BOOT, DEFAULT, CHANGED
+  }
+
+  /**
+   * Wrapper class for OptionValue to add Status
+   */
+  public static class OptionValueWrapper {
+
+
+
+    public final String name;
+    public final Kind kind;
+    public final OptionType type;
+    public final Status status;
+    public final Long num_val;
+    public final String string_val;
+    public final Boolean bool_val;
+    public final Double float_val;
+
+    public OptionValueWrapper(final String name, final Kind kind, final OptionType type, final Long num_val,
+        final String string_val, final Boolean bool_val, final Double float_val,
+        final Status status) {
+      this.name = name;
+      this.kind = kind;
+      this.type = type;
+      this.num_val = num_val;
+      this.string_val = string_val;
+      this.bool_val = bool_val;
+      this.float_val = float_val;
+      this.status = status;
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/22d6465e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index cd8af08..1d73001 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -17,17 +17,11 @@
  */
 package org.apache.drill.exec.store.sys;
 
-import com.google.common.collect.Iterators;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.server.options.DrillConfigIterator;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.OptionValue;
-import org.apache.drill.exec.server.options.OptionValue.Kind;
-import org.apache.drill.exec.server.options.OptionValue.OptionType;
-import org.apache.drill.exec.store.sys.SystemTable.OptionValueWrapper.Status;
-
 import java.util.Iterator;
 
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper;
+
 /**
  * An enumeration of all tables in Drill's system ("sys") schema.
  * <p>
@@ -41,40 +35,14 @@ public enum SystemTable {
   OPTION("options", false, OptionValueWrapper.class) {
     @Override
     public Iterator<Object> getIterator(final FragmentContext context) {
-      final DrillConfigIterator configOptions = new DrillConfigIterator(context.getConfig());
-      final OptionManager fragmentOptions = context.getOptions();
-      final Iterator<OptionValue> mergedOptions = Iterators.concat(configOptions.iterator(), fragmentOptions.iterator());
-      final Iterator<OptionValueWrapper> optionValues = new Iterator<OptionValueWrapper>() {
-        @Override
-        public boolean hasNext() {
-          return mergedOptions.hasNext();
-        }
-
-        @Override
-        public OptionValueWrapper next() {
-          final OptionValue value = mergedOptions.next();
-          final Status status;
-          if (value.type == OptionType.BOOT) {
-            status = Status.BOOT;
-          } else {
-            final OptionValue def = fragmentOptions.getSystemManager().getDefault(value.name);
-            if (value.equals(def)) {
-              status = Status.DEFAULT;
-            } else {
-              status = Status.CHANGED;
-            }
-          }
-          return new OptionValueWrapper(value.name, value.kind, value.type, value.num_val, value.string_val,
-            value.bool_val, value.float_val, status);
-        }
-
-        @Override
-        public void remove() {
-        }
-      };
-      @SuppressWarnings("unchecked")
-      final Iterator<Object> iterator = (Iterator<Object>) (Object) optionValues;
-      return iterator;
+      return new OptionIterator(context, OptionIterator.Mode.SYS_SESS);
+    }
+  },
+
+  BOOT("boot", false, OptionValueWrapper.class) {
+    @Override
+    public Iterator<Object> getIterator(final FragmentContext context) {
+      return new OptionIterator(context, OptionIterator.Mode.BOOT);
     }
   },
 
@@ -134,35 +102,5 @@ public enum SystemTable {
     return pojoClass;
   }
 
-  /**
-   * Wrapper class for OptionValue to add Status
-   */
-  public static class OptionValueWrapper {
-
-    public static enum Status {
-      BOOT, DEFAULT, CHANGED
-    }
 
-    public final String name;
-    public final Kind kind;
-    public final OptionType type;
-    public final Status status;
-    public final Long num_val;
-    public final String string_val;
-    public final Boolean bool_val;
-    public final Double float_val;
-
-    public OptionValueWrapper(final String name, final Kind kind, final OptionType type, final Long num_val,
-                              final String string_val, final Boolean bool_val, final Double float_val,
-                              final Status status) {
-      this.name = name;
-      this.kind = kind;
-      this.type = type;
-      this.num_val = num_val;
-      this.string_val = string_val;
-      this.bool_val = bool_val;
-      this.float_val = float_val;
-      this.status = status;
-    }
-  }
 }


[13/17] drill git commit: When interrupt terminating RPC bus waitForSendComplete, make sure to release buffers.

Posted by ja...@apache.org.
When interrupt terminating  RPC bus waitForSendComplete, make sure to release buffers.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69582463
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69582463
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69582463

Branch: refs/heads/master
Commit: 69582463c12ecb4d9ed987ceca1e13d17b75d7e0
Parents: 62a73bc
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 21:56:14 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:17:59 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/exec/rpc/RpcBus.java   | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/69582463/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 812b2fd..9ca09a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -28,7 +28,6 @@ import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Arrays;
 import java.util.List;
@@ -96,21 +95,21 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
   public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
       SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) {
 
-    if (!allowInEventLoop) {
-      if (connection.inEventLoop()) {
-        throw new IllegalStateException("You attempted to send while inside the rpc event thread.  This isn't allowed because sending will block if the channel is backed up.");
-      }
-
-      if (!connection.blockOnNotWritable(listener)) {
-        return;
-      }
-    }
+    Preconditions
+        .checkArgument(
+            allowInEventLoop || !connection.inEventLoop(),
+            "You attempted to send while inside the rpc event thread.  This isn't allowed because sending will block if the channel is backed up.");
 
     ByteBuf pBuffer = null;
     boolean completed = false;
 
     try {
 
+      if (!allowInEventLoop && !connection.blockOnNotWritable(listener)) {
+        // if we're in not in the event loop and we're interrupted while blocking, skip sending this message.
+        return;
+      }
+
       assert !Arrays.asList(dataBodies).contains(null);
       assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
 


[07/17] drill git commit: DRILL-3081: Populate connection name as late as possible so RPC error messages are reported correctly.

Posted by ja...@apache.org.
DRILL-3081: Populate connection name as late as possible so RPC error messages are reported correctly.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4b0b3a67
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4b0b3a67
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4b0b3a67

Branch: refs/heads/master
Commit: 4b0b3a67ab5e2db2baf34250bdedb174fce648ad
Parents: f0b3671
Author: Parth Chandra <pa...@apache.org>
Authored: Thu May 14 12:27:40 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/rpc/BasicClient.java  | 15 +++++++--
 .../exec/rpc/BasicClientWithConnection.java     |  1 +
 .../org/apache/drill/exec/rpc/BasicServer.java  | 16 +++++-----
 .../apache/drill/exec/rpc/RemoteConnection.java |  8 +++--
 .../java/org/apache/drill/exec/rpc/RpcBus.java  | 32 +++++++++++++++-----
 .../drill/exec/rpc/RpcExceptionHandler.java     | 13 ++++----
 .../drill/exec/rpc/control/ControlClient.java   |  3 +-
 .../drill/exec/rpc/control/ControlServer.java   |  1 +
 .../apache/drill/exec/rpc/data/DataClient.java  |  1 +
 .../apache/drill/exec/rpc/data/DataServer.java  |  1 +
 .../apache/drill/exec/rpc/user/UserServer.java  |  1 +
 11 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index a33b370..cf09be3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -33,6 +33,7 @@ import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import java.net.SocketAddress;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -46,7 +47,7 @@ import com.google.protobuf.Parser;
 
 public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
     extends RpcBus<T, R> {
-  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
   // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
   // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
@@ -101,7 +102,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
             }
 
             pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+            pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
           }
         }); //
 
@@ -110,6 +111,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     // }
   }
 
+  public R initRemoteConnection(SocketChannel channel){
+    local=channel.localAddress();
+    remote=channel.remoteAddress();
+    return null;
+  };
+
   private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
 
   /**
@@ -200,12 +207,14 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
         // So there is no point propagating the interruption as failure immediately.
         long remainingWaitTimeMills = 120000;
         long startTime = System.currentTimeMillis();
-
         // logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
         while(true) {
           try {
             future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
             if (future.isSuccess()) {
+              SocketAddress remote = future.channel().remoteAddress();
+              SocketAddress local = future.channel().localAddress();
+              setAddresses(remote, local);
               // send a handshake on the current thread. This is the only time we will send from within the event thread.
               // We can do this because the connection will not be backed up.
               send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index ab54fa1..c194b5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -51,6 +51,7 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
 
   @Override
   public ServerConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     return new ServerConnection(connectionName, channel, alloc);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 6a7bc65..5c04264 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -85,11 +85,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 
             if (rpcMapping.hasTimeout()) {
               pipe.addLast(TIMEOUT_HANDLER,
-                  new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout()));
+                  new LogggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
             }
 
             pipe.addLast("message-handler", new InboundHandler(connection));
-            pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+            pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
 
             connect = true;
 //            logger.debug("Server connection initialization completed.");
@@ -101,19 +101,19 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 //     }
   }
 
-  private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+  private class LogggingReadTimeoutHandler<C extends RemoteConnection> extends ReadTimeoutHandler {
 
-    private final String name;
+    private final C connection;
     private final int timeoutSeconds;
-    public LogggingReadTimeoutHandler(String name, int timeoutSeconds) {
+    public LogggingReadTimeoutHandler(C connection, int timeoutSeconds) {
       super(timeoutSeconds);
-      this.name = name;
+      this.connection = connection;
       this.timeoutSeconds = timeoutSeconds;
     }
 
     @Override
     protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
-      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", name,
+      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", connection.getName(),
           timeoutSeconds);
       super.readTimedOut(ctx);
     }
@@ -178,6 +178,8 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 
   @Override
   public C initRemoteConnection(SocketChannel channel) {
+    local = channel.localAddress();
+    remote = channel.remoteAddress();
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 199569c..30abcc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -33,7 +33,8 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
   private final Channel channel;
   private final WriteManager writeManager;
-  private final String name;
+  private String name;
+  private final String clientName;
 
   public boolean inEventLoop(){
     return channel.eventLoop().inEventLoop();
@@ -42,7 +43,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   public RemoteConnection(SocketChannel channel, String name) {
     super();
     this.channel = channel;
-    this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name);
+    this.clientName = name;
     this.writeManager = new WriteManager();
     channel.pipeline().addLast(new BackPressureHandler());
     channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@@ -57,6 +58,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   }
 
   public String getName() {
+    if(name == null){
+      name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
+    }
     return name;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 1a23724..812b2fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -29,6 +29,7 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -67,10 +68,19 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   protected final RpcConfig rpcConfig;
 
+  protected volatile SocketAddress local;
+  protected volatile SocketAddress remote;
+
+
   public RpcBus(RpcConfig rpcConfig) {
     this.rpcConfig = rpcConfig;
   }
 
+  protected void setAddresses(SocketAddress remote, SocketAddress local){
+    this.remote = remote;
+    this.local = local;
+  }
+
   <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
       SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
     DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
@@ -133,21 +143,27 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
 
-    final InetSocketAddress local;
-    final InetSocketAddress remote;
     final C clientConnection;
+    private final Channel channel;
 
-    public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) {
-      this.local = local;
-      this.remote = remote;
+    public ChannelClosedHandler(C clientConnection, Channel channel) {
+      this.channel = channel;
       this.clientConnection = clientConnection;
     }
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-      String msg = String.format("Channel closed %s <--> %s.", local, remote);
+      String msg;
+      if(local!=null) {
+        msg = String.format("Channel closed %s <--> %s.", local, remote);
+      }else{
+        msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress());
+      }
+
       if (RpcBus.this.isClient()) {
-        logger.info(String.format(msg));
+        if(local != null) {
+          logger.info(String.format(msg));
+        }
       } else {
         queue.channelClosed(new ChannelClosedException(msg));
       }
@@ -158,7 +174,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
   }
 
   protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
-    return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress());
+    return new ChannelClosedHandler(clientConnection, channel);
   }
 
   private class ResponseSenderImpl implements ResponseSender {

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index c12ff7b..46b7702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -19,23 +19,24 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
+import org.eclipse.jetty.io.Connection;
 
-public class RpcExceptionHandler implements ChannelHandler{
+public class RpcExceptionHandler<C extends RemoteConnection> implements ChannelHandler{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
 
-  private final String name;
+  private final C connection;
 
-  public RpcExceptionHandler(String name) {
-    this.name = name;
+  public RpcExceptionHandler(C connection){
+    this.connection = connection;
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
     if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
-      logger.warn("Exception occurred with closed channel.  Connection: {}", name, cause);
+      logger.warn("Exception occurred with closed channel.  Connection: {}", connection.getName(), cause);
       return;
     }else{
-      logger.error("Exception in RPC communication.  Connection: {}.  Closing connection.", name, cause);
+      logger.error("Exception in RPC communication.  Connection: {}.  Closing connection.", connection.getName(), cause);
       ctx.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index f191271..159f1df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -40,7 +40,7 @@ import com.google.protobuf.MessageLite;
 
 public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
 
   private final ControlMessageHandler handler;
   private final DrillbitEndpoint remoteEndpoint;
@@ -66,6 +66,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
   @SuppressWarnings("unchecked")
   @Override
   public ControlConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     this.connection = new ControlConnection("control client", channel,
         (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
     return connection;

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 5e405ab..98ce9e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -69,6 +69,7 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
 
   @Override
   public ControlConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     return new ControlConnection("control server", channel, this, allocator);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index b8a07c7..544bab9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -56,6 +56,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
 
   @Override
   public DataClientConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     this.connection = new DataClientConnection(channel, this);
     return connection;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 061ddcb..80d2d6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -77,6 +77,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
 
   @Override
   public BitServerConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     return new BitServerConnection(channel, context.getAllocator());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 72b07ba..a197356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -175,6 +175,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
   @Override
   public UserClientConnection initRemoteConnection(SocketChannel channel) {
+    super.initRemoteConnection(channel);
     return new UserClientConnection(channel);
   }
 


[15/17] drill git commit: DRILL-3098: Set Unix style "line.separator" for tests

Posted by ja...@apache.org.
DRILL-3098: Set Unix style "line.separator" for tests


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/984ee012
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/984ee012
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/984ee012

Branch: refs/heads/master
Commit: 984ee012ae0e0a42efa1b6512d7c14dd7f287d1a
Parents: 7c78244
Author: Aditya Kishore <ad...@apache.org>
Authored: Thu May 14 21:51:18 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:18:03 2015 -0700

----------------------------------------------------------------------
 common/src/test/java/org/apache/drill/test/DrillTest.java | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/984ee012/common/src/test/java/org/apache/drill/test/DrillTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java
index 8abcb6a..bbe014f 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -40,7 +40,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 public class DrillTest {
   static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTest.class);
 
-  protected static final ObjectMapper objectMapper = new ObjectMapper();
+  protected static final ObjectMapper objectMapper;
+  static {
+    System.setProperty("line.separator", "\n");
+    objectMapper = new ObjectMapper();
+  }
 
   static final SystemManager manager = new SystemManager();
 


[17/17] drill git commit: DRILL-3100: TestImpersonationDisabledWithMiniDFS fails on Windows

Posted by ja...@apache.org.
DRILL-3100: TestImpersonationDisabledWithMiniDFS fails on Windows


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d8b19759
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d8b19759
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d8b19759

Branch: refs/heads/master
Commit: d8b19759657698581cc0d01d7038797952888123
Parents: 36ff259
Author: Aditya Kishore <ad...@apache.org>
Authored: Thu May 14 22:00:34 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:18:03 2015 -0700

----------------------------------------------------------------------
 .../impersonation/TestImpersonationDisabledWithMiniDFS.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d8b19759/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
index e38d6da..6c3b96f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
@@ -46,6 +46,7 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
     miniDfsPluginConfig.connection = conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
 
     Map<String, WorkspaceConfig> workspaces = Maps.newHashMap(lfsPluginConfig.workspaces);
+    createAndAddWorkspace(dfsCluster.getFileSystem(), "dfstemp", "/tmp", (short)0777, processUser, processUser, workspaces);
 
     miniDfsPluginConfig.workspaces = workspaces;
     miniDfsPluginConfig.formats = ImmutableMap.copyOf(lfsPluginConfig.formats);
@@ -54,13 +55,13 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
     pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
 
     // Create test table in minidfs.tmp schema for use in test queries
-    test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`", MINIDFS_STORAGE_PLUGIN_NAME));
+    test(String.format("CREATE TABLE %s.dfstemp.dfsRegion AS SELECT * FROM cp.`region.json`", MINIDFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test // DRILL-3037
   public void testSimpleQuery() throws Exception {
     final String query =
-        String.format("SELECT sales_city, sales_country FROM tmp.dfsRegion ORDER BY region_id DESC LIMIT 2");
+        String.format("SELECT sales_city, sales_country FROM dfstemp.dfsRegion ORDER BY region_id DESC LIMIT 2");
 
     testBuilder()
         .optionSettingQueriesForTestQuery(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME))


[10/17] drill git commit: Update configuration defaults. Rename bounds system property to drill.enable_unsafe_memory_access.

Posted by ja...@apache.org.
Update configuration defaults.  Rename bounds system property to drill.enable_unsafe_memory_access.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f0b3671d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f0b3671d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f0b3671d

Branch: refs/heads/master
Commit: f0b3671d60d92800411d81ad283430d0eef01c96
Parents: 22d6465
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 18:09:11 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:53 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/drill/exec/util/AssertionUtil.java    | 2 +-
 exec/java-exec/src/main/resources/drill-module.conf                | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f0b3671d/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
index 20c2f8e..2348e62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
@@ -28,7 +28,7 @@ public class AssertionUtil {
     boolean isAssertEnabled = false;
     assert isAssertEnabled = true;
     ASSERT_ENABLED = isAssertEnabled;
-    BOUNDS_CHECKING_ENABLED = ASSERT_ENABLED || !"false".equals(System.getProperty("bounds"));
+    BOUNDS_CHECKING_ENABLED = ASSERT_ENABLED || !"true".equals(System.getProperty("drill.enable_unsafe_memory_access"));
   }
 
   public static boolean isAssertionsEnabled(){

http://git-wip-us.apache.org/repos/asf/drill/blob/f0b3671d/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6fb9340..66055f1 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -37,7 +37,7 @@ drill.exec: {
       }
     },
     bit: {
-      timeout: 30,
+      timeout: 300,
       server: {
         port : 31011,
         retry:{


[11/17] drill git commit: DRILL-3052, DRILL-3066: Improve fragment state management in face of early cancellation.

Posted by ja...@apache.org.
DRILL-3052, DRILL-3066: Improve fragment state management in face of early cancellation.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/aaf9fb83
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/aaf9fb83
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/aaf9fb83

Branch: refs/heads/master
Commit: aaf9fb834e02b9a3483758dd6eb475cc781db866
Parents: 4b0b3a6
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 20:07:29 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:14:56 2015 -0700

----------------------------------------------------------------------
 .../exec/work/fragment/FragmentExecutor.java    | 106 ++++++++++---------
 1 file changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/aaf9fb83/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index ffb76b1..e5e0700 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work.fragment;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.common.DeferredException;
@@ -53,6 +54,7 @@ public class FragmentExecutor implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
   private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class);
 
+  private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false);
   private final String fragmentName;
   private final FragmentContext fragmentContext;
   private final StatusReporter listener;
@@ -139,25 +141,10 @@ public class FragmentExecutor implements Runnable {
    * so we need to be careful about the state transitions that can result.
    */
   public void cancel() {
-    /*
-     * When cancel() is called before run(), root is not initialized and the executor is not
-     * ready to accept external events. So do not wait to change the state.
-     *
-     * For example, consider the case when the Foreman sets up the root fragment executor which is
-     * waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The
-     * run() method on the root executor will never be called, and the executor will never be ready
-     * to accept external events. This would make the cancelling thread wait forever, if it was waiting on
-     * acceptExternalEvents.
-     */
-    synchronized (this) {
-      if (root != null) {
-        acceptExternalEvents.awaitUninterruptibly();
-      } else {
-        // This fragment may or may not start running. If it doesn't then closeOutResources() will never be called.
-        // Assuming it's safe to call closeOutResources() multiple times, we call it here explicitly in case this
-        // fragment will never start running.
-        closeOutResources();
-      }
+    final boolean thisIsOnlyThread = this.hasCloseoutThread.compareAndSet(false, true);
+
+    if (!thisIsOnlyThread) {
+      acceptExternalEvents.awaitUninterruptibly();
 
       /*
        * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
@@ -165,14 +152,33 @@ public class FragmentExecutor implements Runnable {
       updateState(FragmentState.CANCELLATION_REQUESTED);
 
       /*
-       * Interrupt the thread so that it exits from any blocking operation it could be executing currently.
+       * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
+       * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
+       * procedure of the main thread.
        */
-      final Thread myThread = myThreadRef.get();
-      if (myThread != null) {
-        logger.debug("Interrupting fragment thread {}", myThread.getName());
-        myThread.interrupt();
+      synchronized (myThreadRef) {
+        final Thread myThread = myThreadRef.get();
+        if (myThread != null) {
+          logger.debug("Interrupting fragment thread {}", myThread.getName());
+          myThread.interrupt();
+        }
       }
+    } else {
+      updateState(FragmentState.CANCELLATION_REQUESTED);
+      cleanup(FragmentState.FINISHED);
     }
+
+  }
+
+  private void cleanup(FragmentState state) {
+
+    closeOutResources();
+
+    updateState(state);
+    // send the final state of the fragment. only the main execution thread can send the final state and it can
+    // only be sent once.
+    sendFinalState();
+
   }
 
   /**
@@ -203,6 +209,11 @@ public class FragmentExecutor implements Runnable {
 
   @Override
   public void run() {
+    // if a cancel thread has already entered this executor, we have not reason to continue.
+    if (!hasCloseoutThread.compareAndSet(false, true)) {
+      return;
+    }
+
     final Thread myThread = Thread.currentThread();
     myThreadRef.set(myThread);
     final String originalThreadName = myThread.getName();
@@ -216,31 +227,24 @@ public class FragmentExecutor implements Runnable {
 
       myThread.setName(newThreadName);
 
-      synchronized (this) {
-        /*
-         * fragmentState might have changed even before this method is called e.g. cancel()
-         */
-        if (shouldContinue()) {
-          // if we didn't get the root operator when the executor was created, create it now.
-          final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
-              drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+      // if we didn't get the root operator when the executor was created, create it now.
+      final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
+          drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
 
           root = ImplCreator.getExec(fragmentContext, rootOperator);
           if (root == null) {
             return;
           }
 
-          clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
-          updateState(FragmentState.RUNNING);
+      clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+      updateState(FragmentState.RUNNING);
 
-          acceptExternalEvents.countDown();
+      acceptExternalEvents.countDown();
 
-          final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
-          logger.debug("Starting fragment {}:{} on {}:{}",
-            fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
-            endpoint.getAddress(), endpoint.getUserPort());
-        }
-      }
+      final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+      logger.debug("Starting fragment {}:{} on {}:{}",
+          fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
+          endpoint.getAddress(), endpoint.getUserPort());
 
       final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ?
           ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
@@ -275,21 +279,23 @@ public class FragmentExecutor implements Runnable {
       fail(e);
     } finally {
 
+      // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an
+      // interruption after we have moved beyond this block.
+      synchronized (myThreadRef) {
+        myThreadRef.set(null);
+        Thread.interrupted();
+      }
+
       // We need to sure we countDown at least once. We'll do it here to guarantee that.
       acceptExternalEvents.countDown();
 
-      closeOutResources();
-
-      updateState(FragmentState.FINISHED);
-      // send the final state of the fragment. only the main execution thread can send the final state and it can
-      // only be sent once.
-      sendFinalState();
+      // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
+      cleanup(FragmentState.FINISHED);
 
       clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
 
       myThread.setName(originalThreadName);
 
-      myThreadRef.set(null);
     }
   }
 
@@ -404,10 +410,10 @@ public class FragmentExecutor implements Runnable {
         errorStateChange(current, target);
       }
 
-    // these should never be requested.
+      // these should never be requested.
+    case CANCELLED:
     case SENDING:
     case AWAITING_ALLOCATION:
-    case CANCELLED:
     default:
       errorStateChange(current, target);
     }


[16/17] drill git commit: DRILL-3099: FileSelection's selectionRoot does not include the scheme and authority

Posted by ja...@apache.org.
DRILL-3099: FileSelection's selectionRoot does not include the scheme and authority


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36ff2590
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36ff2590
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36ff2590

Branch: refs/heads/master
Commit: 36ff259078f3fa9ebc2fb552eaedc8ce14298636
Parents: 984ee01
Author: Aditya Kishore <ad...@apache.org>
Authored: Thu May 14 21:56:13 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:18:03 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/dfs/FileSelection.java       | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/36ff2590/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index be9784e..e7f7b28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.dfs;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Collections;
 import java.util.List;
 
@@ -127,29 +128,30 @@ public class FileSelection {
     return statuses;
   }
 
-  public static String commonPath(FileStatus... paths){
+  private static String commonPath(FileStatus... paths) {
     String commonPath = "";
     String[][] folders = new String[paths.length][];
-    for(int i = 0; i < paths.length; i++){
+    for (int i = 0; i < paths.length; i++) {
       folders[i] = Path.getPathWithoutSchemeAndAuthority(paths[i].getPath()).toString().split("/");
     }
-    for(int j = 0; j < folders[0].length; j++){
+    for (int j = 0; j < folders[0].length; j++) {
       String thisFolder = folders[0][j];
       boolean allMatched = true;
-      for(int i = 1; i < folders.length && allMatched; i++){
-        if(folders[i].length < j){
+      for (int i = 1; i < folders.length && allMatched; i++) {
+        if (folders[i].length < j) {
           allMatched = false;
           break;
         }
         allMatched &= folders[i][j].equals(thisFolder);
       }
-      if(allMatched){
+      if (allMatched) {
         commonPath += thisFolder + "/";
-      }else{
+      } else {
         break;
       }
     }
-    return commonPath;
+    URI oneURI = paths[0].getPath().toUri();
+    return new Path(oneURI.getScheme(), oneURI.getAuthority(), commonPath).toString();
   }
 
   public static FileSelection create(DrillFileSystem fs, String parent, String path) throws IOException {