You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/03/26 07:26:40 UTC
[01/10] drill git commit: DRILL-2537: Set a maximum source code size
of 2mb for scalar replacement.
Repository: drill
Updated Branches:
refs/heads/0.8.0 [created] 462e50ce9
DRILL-2537: Set a maximum source code size of 2mb for scalar replacement.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1efdbf52
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1efdbf52
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1efdbf52
Branch: refs/heads/0.8.0
Commit: 1efdbf522749ec0c1ad77d1295af2259873ab81d
Parents: f1b59ed
Author: Jacques Nadeau <ja...@apache.org>
Authored: Tue Mar 24 10:46:43 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Mar 24 10:46:43 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/drill/exec/compile/ClassTransformer.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/1efdbf52/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index d4d74dd..3c93599 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -42,6 +42,8 @@ import com.google.common.collect.Sets;
public class ClassTransformer {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassTransformer.class);
+ private static final int MAX_SCALAR_REPLACE_CODE_SIZE = 2*1024*1024; // 2meg
+
private final ByteCodeLoader byteCodeLoader = new ByteCodeLoader();
private final OptionManager optionManager;
@@ -257,7 +259,7 @@ public class ClassTransformer {
* we're using TRY.
*/
MergedClassResult result = null;
- boolean scalarReplace = scalarReplacementOption != ScalarReplacementOption.OFF;
+ boolean scalarReplace = scalarReplacementOption != ScalarReplacementOption.OFF && entireClass.length() < MAX_SCALAR_REPLACE_CODE_SIZE;
while(true) {
try {
result = MergeAdapter.getMergedClass(nextSet, precompiledBytes, generatedNode, scalarReplace);
[07/10] drill git commit: DRILL-2520: Foreman is being removed from
the running query table prematurely WorkManager: - reinstate retireForeman()
on WorkerBee - don't use SelfCleaningRunnable to remove Foreman from running
query list
Posted by ja...@apache.org.
DRILL-2520: Foreman is being removed from the running query table prematurely
WorkManager:
- reinstate retireForeman() on WorkerBee
- don't use SelfCleaningRunnable to remove Foreman from running query list
Foreman:
- use retireForeman() to remove self from running query list during cleanup
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6fec51cb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6fec51cb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6fec51cb
Branch: refs/heads/0.8.0
Commit: 6fec51cbb310bd1669b69f943c220cda7dc88899
Parents: 1c072fe
Author: Chris Westin <cw...@yahoo.com>
Authored: Mon Mar 23 13:31:56 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:22 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/work/WorkManager.java | 31 ++++++++++++++++----
.../apache/drill/exec/work/foreman/Foreman.java | 3 ++
2 files changed, 28 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/6fec51cb/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index a08630a..231e49a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.work.user.UserWorker;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -173,12 +174,30 @@ public class WorkManager implements AutoCloseable {
public class WorkerBee {
public void addNewForeman(final Foreman foreman) {
queries.put(foreman.getQueryId(), foreman);
- executor.execute(new SelfCleaningRunnable(foreman) {
- @Override
- protected void cleanup() {
- queries.remove(foreman.getQueryId(), foreman);
- }
- });
+
+ // We're relying on the Foreman to clean itself up with retireForeman().
+ executor.execute(foreman);
+ }
+
+ /**
+ * Remove the given Foreman from the running query list.
+ *
+ * <p>The running query list is a bit of a misnomer, because it doesn't
+ * necessarily mean that {@link org.apache.drill.exec.work.foreman.Foreman#run()}
+ * is executing. That only lasts for the duration of query setup, after which
+ * the Foreman instance survives as a state machine that reacts to events
+ * from the local root fragment as well as RPC responses from remote Drillbits.</p>
+ *
+ * @param foreman the Foreman to retire
+ */
+ public void retireForeman(final Foreman foreman) {
+ Preconditions.checkNotNull(foreman);
+
+ final QueryId queryId = foreman.getQueryId();
+ final boolean wasRemoved = queries.remove(queryId, foreman);
+ if (!wasRemoved) {
+ throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
+ }
}
public Foreman getForemanForQueryId(final QueryId queryId) {
http://git-wip-us.apache.org/repos/asf/drill/blob/6fec51cb/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 44112c3..32fd650 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -633,6 +633,9 @@ public class Foreman implements Runnable {
logger.warn("Exception sending result to client", resultException);
}
+ // Remove the Foreman from the running query list.
+ bee.retireForeman(Foreman.this);
+
try {
releaseLease();
} finally {
[02/10] drill git commit: DRILL-2541: Plan fragment logging causes
significant delay in query start
Posted by ja...@apache.org.
DRILL-2541: Plan fragment logging causes significant delay in query start
Foreman
- change plan fragment logging from INFO level to TRACE level
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/83a726cb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/83a726cb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/83a726cb
Branch: refs/heads/0.8.0
Commit: 83a726cba49d1efd48ba0e0922f467f0ebcce69b
Parents: 1efdbf5
Author: Chris Westin <cw...@yahoo.com>
Authored: Tue Mar 24 13:02:10 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Mar 24 18:13:29 2015 -0700
----------------------------------------------------------------------
.../main/java/org/apache/drill/exec/work/foreman/Foreman.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/83a726cb/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index bfb6de8..44112c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -439,7 +439,7 @@ public class Foreman implements Runnable {
queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment,
initiatingClient.getSession(), queryContext.getQueryDateTimeInfo());
- if (logger.isInfoEnabled()) {
+ if (logger.isTraceEnabled()) {
final StringBuilder sb = new StringBuilder();
sb.append("PlanFragments for query ");
sb.append(queryId);
@@ -477,7 +477,7 @@ public class Foreman implements Runnable {
}
sb.append(jsonString);
- logger.info(sb.toString());
+ logger.trace(sb.toString());
}
}
[06/10] drill git commit: DRILL-2502: Improve code safety by
providing a generic event delivery mechan This fixes some problems with
broken state transitions that happen under
queryManager.cancelExecutingFragments(), which cause recursive entry into Fore
Posted by ja...@apache.org.
DRILL-2502: Improve code safety by providing a generic event delivery mechan
This fixes some problems with broken state transitions that happen under
queryManager.cancelExecutingFragments(), which cause recursive entry into
Foreman.moveToState().
EventProcessor
- created
Foreman
- altered to use the EventProcessor for moveToState()
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f89c59b3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f89c59b3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f89c59b3
Branch: refs/heads/0.8.0
Commit: f89c59b392a3ffd93df05a9d0414bfa8ba37edf7
Parents: 6fec51c
Author: Chris Westin <cw...@yahoo.com>
Authored: Tue Mar 24 16:20:06 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:22 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/common/EventProcessor.java | 104 +++++++++++
.../apache/drill/exec/work/foreman/Foreman.java | 171 +++++++++++--------
2 files changed, 207 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f89c59b3/common/src/main/java/org/apache/drill/common/EventProcessor.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/EventProcessor.java b/common/src/main/java/org/apache/drill/common/EventProcessor.java
new file mode 100644
index 0000000..617801b
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/EventProcessor.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common;
+
+import java.util.LinkedList;
+
+/**
+ * Process events serially.
+ *
+ * <p>Our use of listeners that deliver events directly can sometimes
+ * cause problems when events are delivered recursively in the middle of
+ * event handling by the same object. This helper class can be used to
+ * serialize events in such cases. If an event is being processed, arriving
+ * events are queued. Once the current event handling is completed, the
+ * next event on the queue is processed; this continues until the event
+ * queue is empty. The first thread to arrive will process its own event
+ * and all other events that arrive during that processing. Other threads
+ * will just enqueue their events.</p>
+ *
+ * @param <T> the event class
+ */
+public abstract class EventProcessor<T> {
+ private final LinkedList<T> queuedEvents = new LinkedList<>();
+ private volatile boolean isProcessing = false;
+
+ /**
+ * Constructor.
+ */
+ public EventProcessor() {
+ }
+
+ /**
+ * Send an event to the processor. If the processor is not busy, the event
+ * will be processed. If busy, the event will be queued to be processed after
+ * any prior events are processed.
+ *
+ * <p>If an event's processing causes an exception, it will be added to any
+ * previous exceptions as a suppressed exception. Once all the currently queued
+ * events have been processed, a single exception will be thrown.</p>
+ *
+ * @param newEvent the new event
+ */
+ public void sendEvent(final T newEvent) {
+ synchronized (queuedEvents) {
+ if (isProcessing) {
+ queuedEvents.addLast(newEvent);
+ return;
+ }
+
+ isProcessing = true;
+ }
+
+ @SuppressWarnings("resource")
+ final DeferredException deferredException = new DeferredException();
+ T event = newEvent;
+ while (true) {
+ try {
+ processEvent(event);
+ } catch (Exception e) {
+ deferredException.addException(e);
+ } catch (AssertionError ae) {
+ deferredException.addException(new RuntimeException("Caught an assertion", ae));
+ }
+
+ synchronized (queuedEvents) {
+ if (queuedEvents.isEmpty()) {
+ isProcessing = false;
+ break;
+ }
+
+ event = queuedEvents.removeFirst();
+ }
+ }
+
+ try {
+ deferredException.close();
+ } catch(Exception e) {
+ throw new RuntimeException("Exceptions caught during event processing", e);
+ }
+ }
+
+ /**
+ * Process a single event. Derived classes provide the implementation of this
+ * to process events.
+ *
+ * @param event the event to process
+ */
+ protected abstract void processEvent(T event);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f89c59b3/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 32fd650..285b75a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.drill.common.EventProcessor;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.LogicalPlan;
@@ -118,6 +119,7 @@ public class Foreman implements Runnable {
private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
private final StateListener stateListener = new StateListener(); // source of external events
private final ResponseSendListener responseListener = new ResponseSendListener();
+ private final StateSwitch stateSwitch = new StateSwitch();
private final ForemanResult foremanResult = new ForemanResult();
/**
@@ -644,87 +646,120 @@ public class Foreman implements Runnable {
}
}
- /**
- * Tells the foreman to move to a new state.
- *
- * @param newState the state to move to
- * @param exception if not null, the exception that drove this state transition (usually a failure)
- */
- private synchronized void moveToState(final QueryState newState, final Exception exception) {
- logger.info("State change requested. {} --> {}", state, newState, exception);
- switch(state) {
- case PENDING:
- if (newState == QueryState.RUNNING) {
- recordNewState(QueryState.RUNNING);
- return;
- }
+ private static class StateEvent {
+ final QueryState newState;
+ final Exception exception;
- //$FALL-THROUGH$
+ StateEvent(final QueryState newState, final Exception exception) {
+ this.newState = newState;
+ this.exception = exception;
+ }
+ }
- case RUNNING: {
- /*
- * For cases that cancel executing fragments, we have to record the new state first, because
- * the cancellation of the local root fragment will cause this to be called recursively.
- */
- switch(newState) {
- case CANCELLATION_REQUESTED: {
- assert exception == null;
- queryManager.markEndTime();
- recordNewState(QueryState.CANCELLATION_REQUESTED);
- queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
- foremanResult.setCompleted(QueryState.CANCELED);
+ private class StateSwitch extends EventProcessor<StateEvent> {
+ public void moveToState(final QueryState newState, final Exception exception) {
+ sendEvent(new StateEvent(newState, exception));
+ }
+
+ @Override
+ protected void processEvent(StateEvent event) {
+ final QueryState newState = event.newState;
+ final Exception exception = event.exception;
+
+ // TODO Auto-generated method stub
+ logger.info("State change requested. {} --> {}", state, newState,
+ exception);
+ switch (state) {
+ case PENDING:
+ if (newState == QueryState.RUNNING) {
+ recordNewState(QueryState.RUNNING);
+ return;
+ }
+
+ //$FALL-THROUGH$
+
+ case RUNNING: {
/*
- * We don't close the foremanResult until we've gotten acknowledgements, which
- * happens below in the case for current state == CANCELLATION_REQUESTED.
+ * For cases that cancel executing fragments, we have to record the new
+ * state first, because the cancellation of the local root fragment will
+ * cause this to be called recursively.
*/
- return;
- }
+ switch (newState) {
+ case CANCELLATION_REQUESTED: {
+ assert exception == null;
+ queryManager.markEndTime();
+ recordNewState(QueryState.CANCELLATION_REQUESTED);
+ queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ foremanResult.setCompleted(QueryState.CANCELED);
+ /*
+ * We don't close the foremanResult until we've gotten
+ * acknowledgements, which happens below in the case for current state
+ * == CANCELLATION_REQUESTED.
+ */
+ return;
+ }
- case COMPLETED: {
- assert exception == null;
- queryManager.markEndTime();
- recordNewState(QueryState.COMPLETED);
- foremanResult.setCompleted(QueryState.COMPLETED);
- foremanResult.close();
- return;
- }
+ case COMPLETED: {
+ assert exception == null;
+ queryManager.markEndTime();
+ recordNewState(QueryState.COMPLETED);
+ foremanResult.setCompleted(QueryState.COMPLETED);
+ foremanResult.close();
+ return;
+ }
- case FAILED: {
- assert exception != null;
- queryManager.markEndTime();
- recordNewState(QueryState.FAILED);
- queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
- foremanResult.setFailed(exception);
- foremanResult.close();
- return;
- }
+ case FAILED: {
+ assert exception != null;
+ queryManager.markEndTime();
+ recordNewState(QueryState.FAILED);
+ queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ foremanResult.setFailed(exception);
+ foremanResult.close();
+ return;
+ }
- default:
- throw new IllegalStateException("illegal transition from RUNNING to " + newState);
+ default:
+ throw new IllegalStateException("illegal transition from RUNNING to "
+ + newState);
+ }
}
- }
- case CANCELLATION_REQUESTED:
- if ((newState == QueryState.CANCELED) || (newState == QueryState.COMPLETED)
- || (newState == QueryState.FAILED)) {
- /*
- * These amount to a completion of the cancellation requests' cleanup; now we
- * can clean up and send the result.
- */
- foremanResult.close();
+ case CANCELLATION_REQUESTED:
+ if ((newState == QueryState.CANCELED)
+ || (newState == QueryState.COMPLETED)
+ || (newState == QueryState.FAILED)) {
+ /*
+ * These amount to a completion of the cancellation requests' cleanup;
+ * now we can clean up and send the result.
+ */
+ foremanResult.close();
+ }
+ return;
+
+ case CANCELED:
+ case COMPLETED:
+ case FAILED:
+ logger
+ .warn(
+ "Dropping request to move to {} state as query is already at {} state (which is terminal).",
+ newState, state);
+ return;
}
- return;
- case CANCELED:
- case COMPLETED:
- case FAILED:
- logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).",
- newState, state);
- return;
+ throw new IllegalStateException(String.format(
+ "Failure trying to change states: %s --> %s", state.name(),
+ newState.name()));
}
+ }
- throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s",
- state.name(), newState.name()));
+ /**
+ * Tells the foreman to move to a new state.
+ *
+ * @param newState the state to move to
+ * @param exception if not null, the exception that drove this state transition (usually a failure)
+ */
+ private void moveToState(final QueryState newState, final Exception exception) {
+ stateSwitch.moveToState(newState, exception);
}
private void recordNewState(final QueryState newState) {
[05/10] drill git commit: DRILL-2572: Use PrelUtil to get
PlannerSettings for PruneScanRule.
Posted by ja...@apache.org.
DRILL-2572: Use PrelUtil to get PlannerSettings for PruneScanRule.
+ context.getPlannerSettings() returns null sometimes
+ introduced in commit 48c9c01
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/96528369
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/96528369
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/96528369
Branch: refs/heads/0.8.0
Commit: 965283692001df52990d5a3540b840720e08551b
Parents: b559cc6
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed Mar 25 17:18:57 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:22 2015 -0700
----------------------------------------------------------------------
.../drill/exec/planner/logical/partition/PruneScanRule.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/96528369/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index b8c9ebf..413259d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -119,7 +120,7 @@ public abstract class PruneScanRule extends RelOptRule {
}
protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
- PlannerSettings settings = context.getPlannerSettings();
+ final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
FileSystemPartitionDescriptor descriptor = new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
final BufferAllocator allocator = context.getAllocator();
[10/10] drill git commit: DRILL-2575: FragmentExecutor.cancel()
blasts through state transitions regardless of current state
Posted by ja...@apache.org.
DRILL-2575: FragmentExecutor.cancel() blasts through state transitions regardless of current state
FragmentExecutor:
- Changed cancel() to behave asynchronously, and for the cancelation request to
be checked at an appropriate place in the run() loop.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/462e50ce
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/462e50ce
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/462e50ce
Branch: refs/heads/0.8.0
Commit: 462e50ce9c4b829c2a4bafdeb9763bfba677c726
Parents: db2e032
Author: Chris Westin <cw...@yahoo.com>
Authored: Wed Mar 25 19:05:25 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:23 2015 -0700
----------------------------------------------------------------------
.../exec/work/fragment/FragmentExecutor.java | 65 +++++++++++++-------
1 file changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/462e50ce/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 5592707..a7e6c46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -47,6 +47,7 @@ public class FragmentExecutor implements Runnable {
private final FragmentRoot rootOperator;
private final FragmentContext fragmentContext;
private final StatusReporter listener;
+ private volatile boolean canceled;
private volatile boolean closed;
private RootExec root;
@@ -88,15 +89,15 @@ public class FragmentExecutor implements Runnable {
}
public void cancel() {
- logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
-
- // Note this will be called outside of run(), from another thread
- // Change state checked by main loop to terminate it (if not already done):
- updateState(FragmentState.CANCELLED);
-
- fragmentContext.cancel();
-
- logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
+ /*
+ * Note that this can be called from threads *other* than the one running this runnable(), so
+ * we need to be careful about the state transitions that can result. We set the canceled flag,
+ * and this is checked in the run() loop, where action will be taken as soon as possible.
+ *
+ * If the run loop has already exited, because we've already either completed or failed the query,
+ * then the request to cancel is a no-op anyway, so it doesn't matter that we won't see the flag.
+ */
+ canceled = true;
}
public void receivingFragmentFinished(FragmentHandle handle) {
@@ -142,6 +143,23 @@ public class FragmentExecutor implements Runnable {
* alerting the user--the behavior then is to hang.
*/
while (state.get() == FragmentState.RUNNING_VALUE) {
+ if (canceled) {
+ logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
+
+ // Change state checked by main loop to terminate it (if not already done):
+ updateState(FragmentState.CANCELLED);
+
+ fragmentContext.cancel();
+
+ logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
+
+ /*
+ * The state will be altered because of the updateState(), which would cause
+ * us to fall out of the enclosing while loop; we just short-circuit that here
+ */
+ break;
+ }
+
if (!root.next()) {
if (fragmentContext.isFailed()) {
internalFail(fragmentContext.getFailureCause());
@@ -180,19 +198,21 @@ public class FragmentExecutor implements Runnable {
* be safe to call it more than once. We use this flag to bypass the body if it has
* been called before.
*/
- if (closed) {
- return;
- }
+ synchronized(this) { // synchronize for the state of closed
+ if (closed) {
+ return;
+ }
- final DeferredException deferredException = fragmentContext.getDeferredException();
- try {
- root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
- } catch (RuntimeException e) {
- logger.warn(CLOSE_FAILURE, e);
- deferredException.addException(e);
- }
+ final DeferredException deferredException = fragmentContext.getDeferredException();
+ try {
+ root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+ } catch (RuntimeException e) {
+ logger.warn(CLOSE_FAILURE, e);
+ deferredException.addException(e);
+ }
- closed = true;
+ closed = true;
+ }
/*
* This must be last, because this may throw deferred exceptions.
@@ -221,7 +241,7 @@ public class FragmentExecutor implements Runnable {
}
/**
- * Updates the fragment state only if the current state matches the expected.
+ * Updates the fragment state only iff the current state matches the expected.
*
* @param expected expected current state
* @param to target state
@@ -258,7 +278,8 @@ public class FragmentExecutor implements Runnable {
private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
final boolean updated = checkAndUpdateState(expected, to);
if (!updated && !isCompleted()) {
- final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
+ final String msg = "State was different than expected while attempting to update state from %s to %s"
+ + "however current state was %s.";
internalFail(new StateTransitionException(
String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
}
[03/10] drill git commit: DRILL-2446: Improvement in finding Drill
log dir
Posted by ja...@apache.org.
DRILL-2446: Improvement in finding Drill log dir
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b559cc6e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b559cc6e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b559cc6e
Branch: refs/heads/0.8.0
Commit: b559cc6ef692bef8edad4789846b21be6fe3d4bb
Parents: 83a726c
Author: Patrick Wong <pw...@maprtech.com>
Authored: Thu Mar 12 14:48:05 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Mar 24 18:13:40 2015 -0700
----------------------------------------------------------------------
distribution/src/resources/drill-config.sh | 25 +++++++++++++------------
1 file changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/b559cc6e/distribution/src/resources/drill-config.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-config.sh b/distribution/src/resources/drill-config.sh
index 0ee181d..7ddec65 100644
--- a/distribution/src/resources/drill-config.sh
+++ b/distribution/src/resources/drill-config.sh
@@ -78,20 +78,21 @@ fi
. "${DRILL_CONF_DIR}/drill-env.sh"
# get log directory
-if [ "$DRILL_LOG_DIR" = "" ]; then
- DRILL_LOG_DIR=/var/log/drill
- touch $DRILL_LOG_DIR/sqlline.log
- TOUCH_EXIT_CODE=$?
- if [ $TOUCH_EXIT_CODE = 0 ]; then
- echo "Default Drill log directory: $DRILL_LOG_DIR"
- DRILL_LOG_DIR_FALLBACK=0
- else
- #Force DRILL_LOG_DIR to fall back
- DRILL_LOG_DIR_FALLBACK=1
- fi
+if [ "x${DRILL_LOG_DIR}" = "x" ]; then
+ export DRILL_LOG_DIR=/var/log/drill
+fi
+
+touch "$DRILL_LOG_DIR/sqlline.log" &> /dev/null
+TOUCH_EXIT_CODE=$?
+if [ "$TOUCH_EXIT_CODE" = "0" ]; then
+ echo "Drill log directory: $DRILL_LOG_DIR"
+ DRILL_LOG_DIR_FALLBACK=0
+else
+ #Force DRILL_LOG_DIR to fall back
+ DRILL_LOG_DIR_FALLBACK=1
fi
-if [ ! -d $DRILL_LOG_DIR ] || [ $DRILL_LOG_DIR_FALLBACK = 1 ]; then
+if [ ! -d "$DRILL_LOG_DIR" ] || [ "$DRILL_LOG_DIR_FALLBACK" = "1" ]; then
echo "Drill log directory $DRILL_LOG_DIR does not exist or is not writable, defaulting to $DRILL_HOME/log"
DRILL_LOG_DIR=$DRILL_HOME/log
mkdir -p $DRILL_LOG_DIR
[08/10] drill git commit: DRILL-2547: Don't allow Drill to shut down
while queries are still executing This will cause Drillbit.close() to block
until all currently executing fragments have completed.
Posted by ja...@apache.org.
DRILL-2547: Don't allow Drill to shut down while queries are still executing
This will cause Drillbit.close() to block until all currently executing
fragments have completed.
WorkManager
- added waitForExit() and indicateIfSafeToExit(), which use a latch to
wait to shut down if there are active fragments
- waitForExit() times out after 5 seconds
Drillbit
- call WorkManager.waitForExit() in close()
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/26463d38
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/26463d38
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/26463d38
Branch: refs/heads/0.8.0
Commit: 26463d38f356c138ba42b78144842950c33e1cef
Parents: f89c59b
Author: Chris Westin <cw...@yahoo.com>
Authored: Tue Mar 24 16:21:33 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:23 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/server/Drillbit.java | 3 ++
.../org/apache/drill/exec/work/WorkManager.java | 47 +++++++++++++++++++-
2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/26463d38/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 0f531b8..958f2dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -252,6 +252,9 @@ public class Drillbit implements AutoCloseable {
return;
}
+ // wait for anything that is running to complete
+ manager.waitToExit();
+
if (coord != null && registrationHandle != null) {
coord.unregister(registrationHandle);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/26463d38/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 231e49a..e2bcec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -168,6 +169,46 @@ public class WorkManager implements AutoCloseable {
return dContext;
}
+ private CountDownLatch exitLatch = null; // used to wait to exit when things are still running
+
+ /**
+ * Waits until it is safe to exit. Blocks until all currently running fragments have completed.
+ *
+ * <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p>
+ */
+ public void waitToExit() {
+ synchronized(this) {
+ if (queries.isEmpty() && runningFragments.isEmpty()) {
+ return;
+ }
+
+ exitLatch = new CountDownLatch(1);
+ }
+
+ while(true) {
+ try {
+ exitLatch.await(5, TimeUnit.SECONDS);
+ } catch(InterruptedException e) {
+ // keep waiting
+ }
+ break;
+ }
+ }
+
+ /**
+ * If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will
+ * unblock.
+ */
+ private void indicateIfSafeToExit() {
+ synchronized(this) {
+ if (exitLatch != null) {
+ if (queries.isEmpty() && runningFragments.isEmpty()) {
+ exitLatch.countDown();
+ }
+ }
+ }
+ }
+
/**
* Narrowed interface to WorkManager that is made available to tasks it is managing.
*/
@@ -196,8 +237,11 @@ public class WorkManager implements AutoCloseable {
final QueryId queryId = foreman.getQueryId();
final boolean wasRemoved = queries.remove(queryId, foreman);
if (!wasRemoved) {
- throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
+ logger.warn("Couldn't find retiring Foreman for query " + queryId);
+// throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
}
+
+ indicateIfSafeToExit();
}
public Foreman getForemanForQueryId(final QueryId queryId) {
@@ -219,6 +263,7 @@ public class WorkManager implements AutoCloseable {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
+ indicateIfSafeToExit();
}
});
}
[04/10] drill git commit: DRILL-2567: CONVERT_FROM in where clause
cause the query to fail in planning phase
Posted by ja...@apache.org.
DRILL-2567: CONVERT_FROM in where clause cause the query to fail in planning phase
Set the writeIndex of ByteBuf returned by Unpooled.wrappedBuffer() to 0.
+ Added a unit test to exercise the code path.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1c072fe8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1c072fe8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1c072fe8
Branch: refs/heads/0.8.0
Commit: 1c072fe8ed72bbcea3abf87b256903a42c9d3bec
Parents: 9652836
Author: Aditya Kishore <ad...@apache.org>
Authored: Wed Mar 25 15:00:54 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:22 2015 -0700
----------------------------------------------------------------------
.../store/hbase/CompareFunctionsProcessor.java | 20 +++++++++++++-------
.../drill/hbase/TestHBaseFilterPushDown.java | 13 +++++++++++++
2 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/1c072fe8/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
index 1635c5d..803f520 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
@@ -119,7 +119,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
case "UINT4":
if (valueArg instanceof IntExpression
&& (isEqualityFn || encodingType.startsWith("U"))) {
- bb = Unpooled.wrappedBuffer(new byte[4]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb = newByteBuf(4, encodingType.endsWith("_BE"));
bb.writeInt(((IntExpression)valueArg).getInt());
}
break;
@@ -129,39 +129,39 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
case "UINT8":
if (valueArg instanceof LongExpression
&& (isEqualityFn || encodingType.startsWith("U"))) {
- bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb = newByteBuf(8, encodingType.endsWith("_BE"));
bb.writeLong(((LongExpression)valueArg).getLong());
}
break;
case "FLOAT":
if (valueArg instanceof FloatExpression && isEqualityFn) {
- bb = Unpooled.wrappedBuffer(new byte[4]).order(ByteOrder.BIG_ENDIAN);
+ bb = newByteBuf(4, true);
bb.writeFloat(((FloatExpression)valueArg).getFloat());
}
break;
case "DOUBLE":
if (valueArg instanceof DoubleExpression && isEqualityFn) {
- bb = Unpooled.wrappedBuffer(new byte[8]).order(ByteOrder.BIG_ENDIAN);;
+ bb = newByteBuf(8, true);
bb.writeDouble(((DoubleExpression)valueArg).getDouble());
}
break;
case "TIME_EPOCH":
case "TIME_EPOCH_BE":
if (valueArg instanceof TimeExpression) {
- bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb = newByteBuf(8, encodingType.endsWith("_BE"));
bb.writeLong(((TimeExpression)valueArg).getTime());
}
break;
case "DATE_EPOCH":
case "DATE_EPOCH_BE":
if (valueArg instanceof DateExpression) {
- bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb = newByteBuf(8, encodingType.endsWith("_BE"));
bb.writeLong(((DateExpression)valueArg).getDate());
}
break;
case "BOOLEAN_BYTE":
if (valueArg instanceof BooleanExpression) {
- bb = Unpooled.wrappedBuffer(new byte[1]);
+ bb = newByteBuf(1, false /* does not matter */);
bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
}
break;
@@ -194,6 +194,12 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
return false;
}
+ private static ByteBuf newByteBuf(int size, boolean bigEndian) {
+ return Unpooled.wrappedBuffer(new byte[size])
+ .order(bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN)
+ .writerIndex(0);
+ }
+
private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
static {
ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
http://git-wip-us.apache.org/repos/asf/drill/blob/1c072fe8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 4e63a3d..ca4c07c 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -118,6 +118,19 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
}
@Test
+ public void testFilterPushDownConvertExpressionWithNumber() throws Exception {
+ setColumnWidths(new int[] {8, 1100});
+ runHBaseSQLVerifyCount("EXPLAIN PLAN FOR\n"
+ + "SELECT\n"
+ + " row_key\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + "WHERE\n"
+ + " convert_from(row_key, 'INT_BE') = 75"
+ , 1);
+ }
+
+ @Test
public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception {
setColumnWidths(new int[] {8, 74, 38});
runHBaseSQLVerifyCount("SELECT\n"
[09/10] drill git commit: DRILL-2574: SendingAccountor can suffer
from lost updates
Posted by ja...@apache.org.
DRILL-2574: SendingAccountor can suffer from lost updates
SendingAccountor
- atomically get and set the message count to wait for
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/db2e0321
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/db2e0321
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/db2e0321
Branch: refs/heads/0.8.0
Commit: db2e032150ff85bc3c0bd0761efc118bc57a18bf
Parents: 26463d3
Author: Chris Westin <cw...@yahoo.com>
Authored: Wed Mar 25 18:44:41 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Mar 25 21:11:23 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/physical/impl/SendingAccountor.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/db2e0321/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
index 8794188..21fc800 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
@@ -27,10 +27,10 @@ import java.util.concurrent.atomic.AtomicInteger;
* TODO: Need to update to use long for number of pending messages.
*/
public class SendingAccountor {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
private final AtomicInteger batchesSent = new AtomicInteger(0);
- private Semaphore wait = new Semaphore(0);
+ private final Semaphore wait = new Semaphore(0);
public void increment() {
batchesSent.incrementAndGet();
@@ -42,8 +42,10 @@ public class SendingAccountor {
public synchronized void waitForSendComplete() {
try {
- wait.acquire(batchesSent.get());
- batchesSent.set(0);
+ int waitForBatches;
+ while((waitForBatches = batchesSent.getAndSet(0)) != 0) {
+ wait.acquire(waitForBatches);
+ }
} catch (InterruptedException e) {
logger.warn("Failure while waiting for send complete.", e);
// TODO InterruptedException