You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/03/26 07:26:40 UTC

[01/10] drill git commit: DRILL-2537: Set a maximum source code size of 2mb for scalar replacement.

Repository: drill
Updated Branches:
  refs/heads/0.8.0 [created] 462e50ce9


DRILL-2537: Set a maximum source code size of 2mb for scalar replacement.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1efdbf52
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1efdbf52
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1efdbf52

Branch: refs/heads/0.8.0
Commit: 1efdbf522749ec0c1ad77d1295af2259873ab81d
Parents: f1b59ed
Author: Jacques Nadeau <ja...@apache.org>
Authored: Tue Mar 24 10:46:43 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Mar 24 10:46:43 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/exec/compile/ClassTransformer.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1efdbf52/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index d4d74dd..3c93599 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -42,6 +42,8 @@ import com.google.common.collect.Sets;
 public class ClassTransformer {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassTransformer.class);
 
+  private static final int MAX_SCALAR_REPLACE_CODE_SIZE = 2*1024*1024; // 2meg
+
   private final ByteCodeLoader byteCodeLoader = new ByteCodeLoader();
   private final OptionManager optionManager;
 
@@ -257,7 +259,7 @@ public class ClassTransformer {
          *  we're using TRY.
          */
         MergedClassResult result = null;
-        boolean scalarReplace = scalarReplacementOption != ScalarReplacementOption.OFF;
+        boolean scalarReplace = scalarReplacementOption != ScalarReplacementOption.OFF && entireClass.length() < MAX_SCALAR_REPLACE_CODE_SIZE;
         while(true) {
           try {
             result = MergeAdapter.getMergedClass(nextSet, precompiledBytes, generatedNode, scalarReplace);


[07/10] drill git commit: DRILL-2520: Foreman is being removed from the running query table prematurely WorkManager: - reinstate retireForeman() on WorkerBee - don't use SelfCleaningRunnable to remove Foreman from running query list

Posted by ja...@apache.org.
DRILL-2520: Foreman is being removed from the running query table prematurely
WorkManager:
- reinstate retireForeman() on WorkerBee
- don't use SelfCleaningRunnable to remove Foreman from running query list

Foreman:
- use retireForeman() to remove self from running query list during cleanup


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6fec51cb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6fec51cb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6fec51cb

Branch: refs/heads/0.8.0
Commit: 6fec51cbb310bd1669b69f943c220cda7dc88899
Parents: 1c072fe
Author: Chris Westin <cw...@yahoo.com>
Authored: Mon Mar 23 13:31:56 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/work/WorkManager.java | 31 ++++++++++++++++----
 .../apache/drill/exec/work/foreman/Foreman.java |  3 ++
 2 files changed, 28 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6fec51cb/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index a08630a..231e49a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.work.user.UserWorker;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -173,12 +174,30 @@ public class WorkManager implements AutoCloseable {
   public class WorkerBee {
     public void addNewForeman(final Foreman foreman) {
       queries.put(foreman.getQueryId(), foreman);
-      executor.execute(new SelfCleaningRunnable(foreman) {
-        @Override
-        protected void cleanup() {
-          queries.remove(foreman.getQueryId(), foreman);
-        }
-      });
+
+      // We're relying on the Foreman to clean itself up with retireForeman().
+      executor.execute(foreman);
+    }
+
+    /**
+     * Remove the given Foreman from the running query list.
+     *
+     * <p>The running query list is a bit of a misnomer, because it doesn't
+     * necessarily mean that {@link org.apache.drill.exec.work.foreman.Foreman#run()}
+     * is executing. That only lasts for the duration of query setup, after which
+     * the Foreman instance survives as a state machine that reacts to events
+     * from the local root fragment as well as RPC responses from remote Drillbits.</p>
+     *
+     * @param foreman the Foreman to retire
+     */
+    public void retireForeman(final Foreman foreman) {
+      Preconditions.checkNotNull(foreman);
+
+      final QueryId queryId = foreman.getQueryId();
+      final boolean wasRemoved = queries.remove(queryId, foreman);
+      if (!wasRemoved) {
+        throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
+      }
     }
 
     public Foreman getForemanForQueryId(final QueryId queryId) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6fec51cb/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 44112c3..32fd650 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -633,6 +633,9 @@ public class Foreman implements Runnable {
         logger.warn("Exception sending result to client", resultException);
       }
 
+      // Remove the Foreman from the running query list.
+      bee.retireForeman(Foreman.this);
+
       try {
         releaseLease();
       } finally {


[02/10] drill git commit: DRILL-2541: Plan fragment logging causes significant delay in query start

Posted by ja...@apache.org.
DRILL-2541: Plan fragment logging causes significant delay in query start

Foreman
- change plan fragment logging from INFO level to TRACE level


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/83a726cb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/83a726cb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/83a726cb

Branch: refs/heads/0.8.0
Commit: 83a726cba49d1efd48ba0e0922f467f0ebcce69b
Parents: 1efdbf5
Author: Chris Westin <cw...@yahoo.com>
Authored: Tue Mar 24 13:02:10 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Mar 24 18:13:29 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/drill/exec/work/foreman/Foreman.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/83a726cb/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index bfb6de8..44112c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -439,7 +439,7 @@ public class Foreman implements Runnable {
         queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment,
         initiatingClient.getSession(), queryContext.getQueryDateTimeInfo());
 
-    if (logger.isInfoEnabled()) {
+    if (logger.isTraceEnabled()) {
       final StringBuilder sb = new StringBuilder();
       sb.append("PlanFragments for query ");
       sb.append(queryId);
@@ -477,7 +477,7 @@ public class Foreman implements Runnable {
         }
         sb.append(jsonString);
 
-        logger.info(sb.toString());
+        logger.trace(sb.toString());
       }
     }
 


[06/10] drill git commit: DRILL-2502: Improve code safety by providing a generic event delivery mechan This fixes some problems with broken state transitions that happen under queryManager.cancelExecutingFragments(), which cause recursive entry into Fore

Posted by ja...@apache.org.
DRILL-2502: Improve code safety by providing a generic event delivery mechan
This fixes some problems with broken state transitions that happen under
queryManager.cancelExecutingFragments(), which cause recursive entry into
Foreman.moveToState().

EventProcessor
- created
Foreman
- altered to use the EventProcessor for moveToState()


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f89c59b3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f89c59b3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f89c59b3

Branch: refs/heads/0.8.0
Commit: f89c59b392a3ffd93df05a9d0414bfa8ba37edf7
Parents: 6fec51c
Author: Chris Westin <cw...@yahoo.com>
Authored: Tue Mar 24 16:20:06 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/EventProcessor.java | 104 +++++++++++
 .../apache/drill/exec/work/foreman/Foreman.java | 171 +++++++++++--------
 2 files changed, 207 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f89c59b3/common/src/main/java/org/apache/drill/common/EventProcessor.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/EventProcessor.java b/common/src/main/java/org/apache/drill/common/EventProcessor.java
new file mode 100644
index 0000000..617801b
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/EventProcessor.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common;
+
+import java.util.LinkedList;
+
+/**
+ * Process events serially.
+ *
+ * <p>Our use of listeners that deliver events directly can sometimes
+ * cause problems when events are delivered recursively in the middle of
+ * event handling by the same object. This helper class can be used to
+ * serialize events in such cases. If an event is being processed, arriving
+ * events are queued. Once the current event handling is completed, the
+ * next event on the queue is processed; this continues until the event
+ * queue is empty. The first thread to arrive will process its own event
+ * and all other events that arrive during that processing. Other threads
+ * will just enqueue their events.</p>
+ *
+ * @param <T> the event class
+ */
+public abstract class EventProcessor<T> {
+  private final LinkedList<T> queuedEvents = new LinkedList<>();
+  private volatile boolean isProcessing = false;
+
+  /**
+   * Constructor.
+   */
+  public EventProcessor() {
+  }
+
+  /**
+   * Send an event to the processor. If the processor is not busy, the event
+   * will be processed. If busy, the event will be queued to be processed after
+   * any prior events are processed.
+   *
+   * <p>If an event's processing causes an exception, it will be added to any
+   * previous exceptions as a suppressed exception. Once all the currently queued
+   * events have been processed, a single exception will be thrown.</p>
+   *
+   * @param newEvent the new event
+   */
+  public void sendEvent(final T newEvent) {
+    synchronized (queuedEvents) {
+      if (isProcessing) {
+        queuedEvents.addLast(newEvent);
+        return;
+      }
+
+      isProcessing = true;
+    }
+
+    @SuppressWarnings("resource")
+    final DeferredException deferredException = new DeferredException();
+    T event = newEvent;
+    while (true) {
+      try {
+        processEvent(event);
+      } catch (Exception e) {
+        deferredException.addException(e);
+      } catch (AssertionError ae) {
+        deferredException.addException(new RuntimeException("Caught an assertion", ae));
+      }
+
+      synchronized (queuedEvents) {
+        if (queuedEvents.isEmpty()) {
+          isProcessing = false;
+          break;
+        }
+
+        event = queuedEvents.removeFirst();
+      }
+    }
+
+    try {
+      deferredException.close();
+    } catch(Exception e) {
+      throw new RuntimeException("Exceptions caught during event processing", e);
+    }
+  }
+
+  /**
+   * Process a single event. Derived classes provide the implementation of this
+   * to process events.
+   *
+   * @param event the event to process
+   */
+  protected abstract void processEvent(T event);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f89c59b3/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 32fd650..285b75a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.drill.common.EventProcessor;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
@@ -118,6 +119,7 @@ public class Foreman implements Runnable {
   private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
   private final StateListener stateListener = new StateListener(); // source of external events
   private final ResponseSendListener responseListener = new ResponseSendListener();
+  private final StateSwitch stateSwitch = new StateSwitch();
   private final ForemanResult foremanResult = new ForemanResult();
 
   /**
@@ -644,87 +646,120 @@ public class Foreman implements Runnable {
     }
   }
 
-  /**
-   * Tells the foreman to move to a new state.
-   *
-   * @param newState the state to move to
-   * @param exception if not null, the exception that drove this state transition (usually a failure)
-   */
-  private synchronized void moveToState(final QueryState newState, final Exception exception) {
-    logger.info("State change requested.  {} --> {}", state, newState, exception);
-    switch(state) {
-    case PENDING:
-      if (newState == QueryState.RUNNING) {
-        recordNewState(QueryState.RUNNING);
-        return;
-      }
+  private static class StateEvent {
+    final QueryState newState;
+    final Exception exception;
 
-      //$FALL-THROUGH$
+    StateEvent(final QueryState newState, final Exception exception) {
+      this.newState = newState;
+      this.exception = exception;
+    }
+  }
 
-    case RUNNING: {
-      /*
-       * For cases that cancel executing fragments, we have to record the new state first, because
-       * the cancellation of the local root fragment will cause this to be called recursively.
-       */
-      switch(newState) {
-      case CANCELLATION_REQUESTED: {
-        assert exception == null;
-        queryManager.markEndTime();
-        recordNewState(QueryState.CANCELLATION_REQUESTED);
-        queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
-        foremanResult.setCompleted(QueryState.CANCELED);
+  private class StateSwitch extends EventProcessor<StateEvent> {
+    public void moveToState(final QueryState newState, final Exception exception) {
+      sendEvent(new StateEvent(newState, exception));
+    }
+
+    @Override
+    protected void processEvent(StateEvent event) {
+      final QueryState newState = event.newState;
+      final Exception exception = event.exception;
+
+      // TODO Auto-generated method stub
+      logger.info("State change requested.  {} --> {}", state, newState,
+          exception);
+      switch (state) {
+      case PENDING:
+        if (newState == QueryState.RUNNING) {
+          recordNewState(QueryState.RUNNING);
+          return;
+        }
+
+        //$FALL-THROUGH$
+
+      case RUNNING: {
         /*
-         * We don't close the foremanResult until we've gotten acknowledgements, which
-         * happens below in the case for current state == CANCELLATION_REQUESTED.
+         * For cases that cancel executing fragments, we have to record the new
+         * state first, because the cancellation of the local root fragment will
+         * cause this to be called recursively.
          */
-        return;
-      }
+        switch (newState) {
+        case CANCELLATION_REQUESTED: {
+          assert exception == null;
+          queryManager.markEndTime();
+          recordNewState(QueryState.CANCELLATION_REQUESTED);
+          queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+          foremanResult.setCompleted(QueryState.CANCELED);
+          /*
+           * We don't close the foremanResult until we've gotten
+           * acknowledgements, which happens below in the case for current state
+           * == CANCELLATION_REQUESTED.
+           */
+          return;
+        }
 
-      case COMPLETED: {
-        assert exception == null;
-        queryManager.markEndTime();
-        recordNewState(QueryState.COMPLETED);
-        foremanResult.setCompleted(QueryState.COMPLETED);
-        foremanResult.close();
-        return;
-      }
+        case COMPLETED: {
+          assert exception == null;
+          queryManager.markEndTime();
+          recordNewState(QueryState.COMPLETED);
+          foremanResult.setCompleted(QueryState.COMPLETED);
+          foremanResult.close();
+          return;
+        }
 
-      case FAILED: {
-        assert exception != null;
-        queryManager.markEndTime();
-        recordNewState(QueryState.FAILED);
-        queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
-        foremanResult.setFailed(exception);
-        foremanResult.close();
-        return;
-      }
+        case FAILED: {
+          assert exception != null;
+          queryManager.markEndTime();
+          recordNewState(QueryState.FAILED);
+          queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+          foremanResult.setFailed(exception);
+          foremanResult.close();
+          return;
+        }
 
-      default:
-        throw new IllegalStateException("illegal transition from RUNNING to " + newState);
+        default:
+          throw new IllegalStateException("illegal transition from RUNNING to "
+              + newState);
+        }
       }
-    }
 
-    case CANCELLATION_REQUESTED:
-      if ((newState == QueryState.CANCELED) || (newState == QueryState.COMPLETED)
-          || (newState == QueryState.FAILED)) {
-        /*
-         * These amount to a completion of the cancellation requests' cleanup; now we
-         * can clean up and send the result.
-         */
-        foremanResult.close();
+      case CANCELLATION_REQUESTED:
+        if ((newState == QueryState.CANCELED)
+            || (newState == QueryState.COMPLETED)
+            || (newState == QueryState.FAILED)) {
+          /*
+           * These amount to a completion of the cancellation requests' cleanup;
+           * now we can clean up and send the result.
+           */
+          foremanResult.close();
+        }
+        return;
+
+      case CANCELED:
+      case COMPLETED:
+      case FAILED:
+        logger
+            .warn(
+                "Dropping request to move to {} state as query is already at {} state (which is terminal).",
+                newState, state);
+        return;
       }
-      return;
 
-    case CANCELED:
-    case COMPLETED:
-    case FAILED:
-      logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).",
-          newState, state);
-      return;
+      throw new IllegalStateException(String.format(
+          "Failure trying to change states: %s --> %s", state.name(),
+          newState.name()));
     }
+  }
 
-    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s",
-        state.name(), newState.name()));
+  /**
+   * Tells the foreman to move to a new state.
+   *
+   * @param newState the state to move to
+   * @param exception if not null, the exception that drove this state transition (usually a failure)
+   */
+  private void moveToState(final QueryState newState, final Exception exception) {
+    stateSwitch.moveToState(newState, exception);
   }
 
   private void recordNewState(final QueryState newState) {


[05/10] drill git commit: DRILL-2572: Use PrelUtil to get PlannerSettings for PruneScanRule.

Posted by ja...@apache.org.
DRILL-2572: Use PrelUtil to get PlannerSettings for PruneScanRule.

+ context.getPlannerSettings() returns null sometimes
+ introduced in commit 48c9c01


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/96528369
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/96528369
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/96528369

Branch: refs/heads/0.8.0
Commit: 965283692001df52990d5a3540b840720e08551b
Parents: b559cc6
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed Mar 25 17:18:57 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:22 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/planner/logical/partition/PruneScanRule.java       | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/96528369/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index b8c9ebf..413259d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -119,7 +120,7 @@ public abstract class PruneScanRule extends RelOptRule {
   }
 
   protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
-    PlannerSettings settings = context.getPlannerSettings();
+    final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
     FileSystemPartitionDescriptor descriptor = new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
     final BufferAllocator allocator = context.getAllocator();
 


[10/10] drill git commit: DRILL-2575: FragmentExecutor.cancel() blasts through state transitions regardless of current state

Posted by ja...@apache.org.
DRILL-2575: FragmentExecutor.cancel() blasts through state transitions regardless of current state

FragmentExecutor:
- Changed cancel() to behave asynchronously, and for the cancelation request to
  be checked at an appropriate place in the run() loop.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/462e50ce
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/462e50ce
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/462e50ce

Branch: refs/heads/0.8.0
Commit: 462e50ce9c4b829c2a4bafdeb9763bfba677c726
Parents: db2e032
Author: Chris Westin <cw...@yahoo.com>
Authored: Wed Mar 25 19:05:25 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:23 2015 -0700

----------------------------------------------------------------------
 .../exec/work/fragment/FragmentExecutor.java    | 65 +++++++++++++-------
 1 file changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/462e50ce/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 5592707..a7e6c46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -47,6 +47,7 @@ public class FragmentExecutor implements Runnable {
   private final FragmentRoot rootOperator;
   private final FragmentContext fragmentContext;
   private final StatusReporter listener;
+  private volatile boolean canceled;
   private volatile boolean closed;
   private RootExec root;
 
@@ -88,15 +89,15 @@ public class FragmentExecutor implements Runnable {
   }
 
   public void cancel() {
-    logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
-
-    // Note this will be called outside of run(), from another thread
-    // Change state checked by main loop to terminate it (if not already done):
-    updateState(FragmentState.CANCELLED);
-
-    fragmentContext.cancel();
-
-    logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
+    /*
+     * Note that this can be called from threads *other* than the one running this runnable(), so
+     * we need to be careful about the state transitions that can result. We set the canceled flag,
+     * and this is checked in the run() loop, where action will be taken as soon as possible.
+     *
+     * If the run loop has already exited, because we've already either completed or failed the query,
+     * then the request to cancel is a no-op anyway, so it doesn't matter that we won't see the flag.
+     */
+    canceled = true;
   }
 
   public void receivingFragmentFinished(FragmentHandle handle) {
@@ -142,6 +143,23 @@ public class FragmentExecutor implements Runnable {
        * alerting the user--the behavior then is to hang.
        */
       while (state.get() == FragmentState.RUNNING_VALUE) {
+        if (canceled) {
+          logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
+
+          // Change state checked by main loop to terminate it (if not already done):
+          updateState(FragmentState.CANCELLED);
+
+          fragmentContext.cancel();
+
+          logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
+
+          /*
+           * The state will be altered because of the updateState(), which would cause
+           * us to fall out of the enclosing while loop; we just short-circuit that here
+           */
+          break;
+        }
+
         if (!root.next()) {
           if (fragmentContext.isFailed()) {
             internalFail(fragmentContext.getFailureCause());
@@ -180,19 +198,21 @@ public class FragmentExecutor implements Runnable {
      * be safe to call it more than once. We use this flag to bypass the body if it has
      * been called before.
      */
-    if (closed) {
-      return;
-    }
+    synchronized(this) { // synchronize for the state of closed
+      if (closed) {
+        return;
+      }
 
-    final DeferredException deferredException = fragmentContext.getDeferredException();
-    try {
-      root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
-    } catch (RuntimeException e) {
-      logger.warn(CLOSE_FAILURE, e);
-      deferredException.addException(e);
-    }
+      final DeferredException deferredException = fragmentContext.getDeferredException();
+      try {
+        root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+      } catch (RuntimeException e) {
+        logger.warn(CLOSE_FAILURE, e);
+        deferredException.addException(e);
+      }
 
-    closed = true;
+      closed = true;
+    }
 
     /*
      * This must be last, because this may throw deferred exceptions.
@@ -221,7 +241,7 @@ public class FragmentExecutor implements Runnable {
   }
 
   /**
-   * Updates the fragment state only if the current state matches the expected.
+   * Updates the fragment state only iff the current state matches the expected.
    *
    * @param  expected  expected current state
    * @param  to  target state
@@ -258,7 +278,8 @@ public class FragmentExecutor implements Runnable {
   private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
     final boolean updated = checkAndUpdateState(expected, to);
     if (!updated && !isCompleted()) {
-      final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
+      final String msg = "State was different than expected while attempting to update state from %s to %s"
+          + "however current state was %s.";
       internalFail(new StateTransitionException(
           String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
     }


[03/10] drill git commit: DRILL-2446: Improvement in finding Drill log dir

Posted by ja...@apache.org.
DRILL-2446: Improvement in finding Drill log dir


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b559cc6e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b559cc6e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b559cc6e

Branch: refs/heads/0.8.0
Commit: b559cc6ef692bef8edad4789846b21be6fe3d4bb
Parents: 83a726c
Author: Patrick Wong <pw...@maprtech.com>
Authored: Thu Mar 12 14:48:05 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Mar 24 18:13:40 2015 -0700

----------------------------------------------------------------------
 distribution/src/resources/drill-config.sh | 25 +++++++++++++------------
 1 file changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b559cc6e/distribution/src/resources/drill-config.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-config.sh b/distribution/src/resources/drill-config.sh
index 0ee181d..7ddec65 100644
--- a/distribution/src/resources/drill-config.sh
+++ b/distribution/src/resources/drill-config.sh
@@ -78,20 +78,21 @@ fi
 . "${DRILL_CONF_DIR}/drill-env.sh"
 
 # get log directory
-if [ "$DRILL_LOG_DIR" = "" ]; then
-  DRILL_LOG_DIR=/var/log/drill
-  touch $DRILL_LOG_DIR/sqlline.log
-  TOUCH_EXIT_CODE=$?
-  if [ $TOUCH_EXIT_CODE = 0 ]; then
-    echo "Default Drill log directory: $DRILL_LOG_DIR"
-    DRILL_LOG_DIR_FALLBACK=0
-  else
-    #Force DRILL_LOG_DIR to fall back
-    DRILL_LOG_DIR_FALLBACK=1
-  fi
+if [ "x${DRILL_LOG_DIR}" = "x" ]; then
+  export DRILL_LOG_DIR=/var/log/drill
+fi
+
+touch "$DRILL_LOG_DIR/sqlline.log" &> /dev/null
+TOUCH_EXIT_CODE=$?
+if [ "$TOUCH_EXIT_CODE" = "0" ]; then
+  echo "Drill log directory: $DRILL_LOG_DIR"
+  DRILL_LOG_DIR_FALLBACK=0
+else
+  #Force DRILL_LOG_DIR to fall back
+  DRILL_LOG_DIR_FALLBACK=1
 fi
 
-if [ ! -d $DRILL_LOG_DIR ] || [ $DRILL_LOG_DIR_FALLBACK = 1 ]; then
+if [ ! -d "$DRILL_LOG_DIR" ] || [ "$DRILL_LOG_DIR_FALLBACK" = "1" ]; then
   echo "Drill log directory $DRILL_LOG_DIR does not exist or is not writable, defaulting to $DRILL_HOME/log"
   DRILL_LOG_DIR=$DRILL_HOME/log
   mkdir -p $DRILL_LOG_DIR


[08/10] drill git commit: DRILL-2547: Don't allow Drill to shut down while queries are still executing This will cause Drillbit.close() to block until all currently executing fragments have completed.

Posted by ja...@apache.org.
DRILL-2547: Don't allow Drill to shut down while queries are still executing
This will cause Drillbit.close() to block until all currently executing
fragments have completed.

WorkManager
- added waitForExit() and indicateIfSafeToExit(), which use a latch to
  wait to shut down if there are active fragments
  - waitForExit() times out after 5 seconds

Drillbit
- call WorkManager.waitForExit() in close()


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/26463d38
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/26463d38
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/26463d38

Branch: refs/heads/0.8.0
Commit: 26463d38f356c138ba42b78144842950c33e1cef
Parents: f89c59b
Author: Chris Westin <cw...@yahoo.com>
Authored: Tue Mar 24 16:21:33 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:23 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/server/Drillbit.java  |  3 ++
 .../org/apache/drill/exec/work/WorkManager.java | 47 +++++++++++++++++++-
 2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/26463d38/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 0f531b8..958f2dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -252,6 +252,9 @@ public class Drillbit implements AutoCloseable {
       return;
     }
 
+    // wait for anything that is running to complete
+    manager.waitToExit();
+
     if (coord != null && registrationHandle != null) {
       coord.unregister(registrationHandle);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/26463d38/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 231e49a..e2bcec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -168,6 +169,46 @@ public class WorkManager implements AutoCloseable {
     return dContext;
   }
 
+  private CountDownLatch exitLatch = null; // used to wait to exit when things are still running
+
+  /**
+   * Waits until it is safe to exit. Blocks until all currently running fragments have completed.
+   *
+   * <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p>
+   */
+  public void waitToExit() {
+    synchronized(this) {
+      if (queries.isEmpty() && runningFragments.isEmpty()) {
+        return;
+      }
+
+      exitLatch = new CountDownLatch(1);
+    }
+
+    while(true) {
+      try {
+        exitLatch.await(5, TimeUnit.SECONDS);
+      } catch(InterruptedException e) {
+        // keep waiting
+      }
+      break;
+    }
+  }
+
+  /**
+   * If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will
+   * unblock.
+   */
+  private void indicateIfSafeToExit() {
+    synchronized(this) {
+      if (exitLatch != null) {
+        if (queries.isEmpty() && runningFragments.isEmpty()) {
+          exitLatch.countDown();
+        }
+      }
+    }
+  }
+
   /**
    * Narrowed interface to WorkManager that is made available to tasks it is managing.
    */
@@ -196,8 +237,11 @@ public class WorkManager implements AutoCloseable {
       final QueryId queryId = foreman.getQueryId();
       final boolean wasRemoved = queries.remove(queryId, foreman);
       if (!wasRemoved) {
-        throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
+        logger.warn("Couldn't find retiring Foreman for query " + queryId);
+//        throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
       }
+
+      indicateIfSafeToExit();
     }
 
     public Foreman getForemanForQueryId(final QueryId queryId) {
@@ -219,6 +263,7 @@ public class WorkManager implements AutoCloseable {
         @Override
         protected void cleanup() {
           runningFragments.remove(fragmentHandle);
+          indicateIfSafeToExit();
         }
       });
     }


[04/10] drill git commit: DRILL-2567: CONVERT_FROM in where clause cause the query to fail in planning phase

Posted by ja...@apache.org.
DRILL-2567: CONVERT_FROM in where clause cause the query to fail in planning phase

Set the writeIndex of ByteBuf returned by Unpooled.wrappedBuffer() to 0.

+ Added a unit test to exercise the code path.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1c072fe8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1c072fe8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1c072fe8

Branch: refs/heads/0.8.0
Commit: 1c072fe8ed72bbcea3abf87b256903a42c9d3bec
Parents: 9652836
Author: Aditya Kishore <ad...@apache.org>
Authored: Wed Mar 25 15:00:54 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:22 2015 -0700

----------------------------------------------------------------------
 .../store/hbase/CompareFunctionsProcessor.java  | 20 +++++++++++++-------
 .../drill/hbase/TestHBaseFilterPushDown.java    | 13 +++++++++++++
 2 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1c072fe8/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
index 1635c5d..803f520 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
@@ -119,7 +119,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
       case "UINT4":
         if (valueArg instanceof IntExpression
             && (isEqualityFn || encodingType.startsWith("U"))) {
-          bb = Unpooled.wrappedBuffer(new byte[4]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+          bb = newByteBuf(4, encodingType.endsWith("_BE"));
           bb.writeInt(((IntExpression)valueArg).getInt());
         }
         break;
@@ -129,39 +129,39 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
       case "UINT8":
         if (valueArg instanceof LongExpression
             && (isEqualityFn || encodingType.startsWith("U"))) {
-          bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+          bb = newByteBuf(8, encodingType.endsWith("_BE"));
           bb.writeLong(((LongExpression)valueArg).getLong());
         }
         break;
       case "FLOAT":
         if (valueArg instanceof FloatExpression && isEqualityFn) {
-          bb = Unpooled.wrappedBuffer(new byte[4]).order(ByteOrder.BIG_ENDIAN);
+          bb = newByteBuf(4, true);
           bb.writeFloat(((FloatExpression)valueArg).getFloat());
         }
         break;
       case "DOUBLE":
         if (valueArg instanceof DoubleExpression && isEqualityFn) {
-          bb = Unpooled.wrappedBuffer(new byte[8]).order(ByteOrder.BIG_ENDIAN);;
+          bb = newByteBuf(8, true);
           bb.writeDouble(((DoubleExpression)valueArg).getDouble());
         }
         break;
       case "TIME_EPOCH":
       case "TIME_EPOCH_BE":
         if (valueArg instanceof TimeExpression) {
-          bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+          bb = newByteBuf(8, encodingType.endsWith("_BE"));
           bb.writeLong(((TimeExpression)valueArg).getTime());
         }
         break;
       case "DATE_EPOCH":
       case "DATE_EPOCH_BE":
         if (valueArg instanceof DateExpression) {
-          bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+          bb = newByteBuf(8, encodingType.endsWith("_BE"));
           bb.writeLong(((DateExpression)valueArg).getDate());
         }
         break;
       case "BOOLEAN_BYTE":
         if (valueArg instanceof BooleanExpression) {
-          bb = Unpooled.wrappedBuffer(new byte[1]);
+          bb = newByteBuf(1, false /* does not matter */);
           bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
         }
         break;
@@ -194,6 +194,12 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
     return false;
   }
 
+  private static ByteBuf newByteBuf(int size, boolean bigEndian) {
+    return Unpooled.wrappedBuffer(new byte[size])
+        .order(bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN)
+        .writerIndex(0);
+  }
+
   private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
   static {
     ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();

http://git-wip-us.apache.org/repos/asf/drill/blob/1c072fe8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 4e63a3d..ca4c07c 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -118,6 +118,19 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   }
 
   @Test
+  public void testFilterPushDownConvertExpressionWithNumber() throws Exception {
+    setColumnWidths(new int[] {8, 1100});
+    runHBaseSQLVerifyCount("EXPLAIN PLAN FOR\n"
+        + "SELECT\n"
+        + "  row_key\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "WHERE\n"
+        + "  convert_from(row_key, 'INT_BE') = 75"
+        , 1);
+  }
+
+  @Test
   public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception {
     setColumnWidths(new int[] {8, 74, 38});
     runHBaseSQLVerifyCount("SELECT\n"


[09/10] drill git commit: DRILL-2574: SendingAccountor can suffer from lost updates

Posted by ja...@apache.org.
DRILL-2574: SendingAccountor can suffer from lost updates

SendingAccountor
- atomically get and set the message count to wait for


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/db2e0321
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/db2e0321
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/db2e0321

Branch: refs/heads/0.8.0
Commit: db2e032150ff85bc3c0bd0761efc118bc57a18bf
Parents: 26463d3
Author: Chris Westin <cw...@yahoo.com>
Authored: Wed Mar 25 18:44:41 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:23 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/physical/impl/SendingAccountor.java | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/db2e0321/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
index 8794188..21fc800 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
@@ -27,10 +27,10 @@ import java.util.concurrent.atomic.AtomicInteger;
  * TODO: Need to update to use long for number of pending messages.
  */
 public class SendingAccountor {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
 
   private final AtomicInteger batchesSent = new AtomicInteger(0);
-  private Semaphore wait = new Semaphore(0);
+  private final Semaphore wait = new Semaphore(0);
 
   public void increment() {
     batchesSent.incrementAndGet();
@@ -42,8 +42,10 @@ public class SendingAccountor {
 
   public synchronized void waitForSendComplete() {
     try {
-      wait.acquire(batchesSent.get());
-      batchesSent.set(0);
+      int waitForBatches;
+      while((waitForBatches = batchesSent.getAndSet(0)) != 0) {
+        wait.acquire(waitForBatches);
+      }
     } catch (InterruptedException e) {
       logger.warn("Failure while waiting for send complete.", e);
       // TODO InterruptedException