You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/28 20:33:52 UTC
[2/2] camel git commit: CAMEL-8713: ParallelAggregate option when
using parallel mode does not run in parallel
CAMEL-8713: ParallelAggregate option when using parallel mode does not run in parallel
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/943b555d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/943b555d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/943b555d
Branch: refs/heads/camel-2.15.x
Commit: 943b555d62b64817d829e25e5067e27559fbe6e1
Parents: fa726cd
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 28 17:17:28 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 28 20:37:22 2015 +0200
----------------------------------------------------------------------
.../camel/processor/MulticastProcessor.java | 124 ++++++++++++++-----
1 file changed, 93 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/943b555d/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index b953d08..29d0a8b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -352,7 +352,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
}
/**
- * Task to aggregate on-the-fly for completed tasks when using parallel processing.
+ * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing.
* <p/>
* This ensures lower memory consumption as we do not need to keep all completed tasks in memory
* before we perform aggregation. Instead this separate thread will run and aggregate when new
@@ -407,21 +407,21 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
}
private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
- boolean timedOut = false;
+ final AtomicBoolean timedOut = new AtomicBoolean();
boolean stoppedOnException = false;
final StopWatch watch = new StopWatch();
- int aggregated = 0;
+ final AtomicInteger aggregated = new AtomicInteger();
boolean done = false;
// not a for loop as on the fly may still run
while (!done) {
// check if we have already aggregate everything
- if (allTasksSubmitted.get() && aggregated >= total.get()) {
+ if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) {
LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
break;
}
Future<Exchange> future;
- if (timedOut) {
+ if (timedOut.get()) {
// we are timed out but try to grab if some tasks has been completed
// poll will return null if no tasks is present
future = completion.poll();
@@ -444,27 +444,12 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
}
if (future == null) {
- // timeout occurred
- AggregationStrategy strategy = getAggregationStrategy(null);
- if (strategy instanceof TimeoutAwareAggregationStrategy) {
- // notify the strategy we timed out
- Exchange oldExchange = result.get();
- if (oldExchange == null) {
- // if they all timed out the result may not have been set yet, so use the original exchange
- oldExchange = original;
- }
- ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout);
+ ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut);
+ if (parallelAggregate) {
+ aggregateExecutorService.submit(task);
} else {
- // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
- LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
- }
- LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
- timedOut = true;
-
- // mark that index as timed out, which allows us to try to retrieve
- // any already completed tasks in the next loop
- if (completion instanceof SubmitOrderedCompletionService) {
- ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
+ // in non parallel mode then just run the task
+ task.run();
}
} else {
// there is a result to aggregate
@@ -483,18 +468,18 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
}
// we got a result so aggregate it
+ ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated);
if (parallelAggregate) {
- doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
+ aggregateExecutorService.submit(task);
} else {
- doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+ // in non parallel mode then just run the task
+ task.run();
}
}
-
- aggregated++;
}
- if (timedOut || stoppedOnException) {
- if (timedOut) {
+ if (timedOut.get() || stoppedOnException) {
+ if (timedOut.get()) {
LOG.debug("Cancelling tasks due timeout after {} millis.", timeout);
}
if (stoppedOnException) {
@@ -506,6 +491,83 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
}
}
+ /**
+ * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
+ */
+ private final class ParallelAggregateTask implements Runnable {
+
+ private final AtomicExchange result;
+ private final Exchange subExchange;
+ private final AtomicInteger aggregated;
+
+ private ParallelAggregateTask(AtomicExchange result, Exchange subExchange, AtomicInteger aggregated) {
+ this.result = result;
+ this.subExchange = subExchange;
+ this.aggregated = aggregated;
+ }
+
+ @Override
+ public void run() {
+ if (parallelAggregate) {
+ doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
+ } else {
+ doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+ }
+ aggregated.incrementAndGet();
+ }
+ }
+
+ /**
+ * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
+ */
+ private final class ParallelAggregateTimeoutTask implements Runnable {
+
+ private final Exchange original;
+ private final AtomicExchange result;
+ private final CompletionService<Exchange> completion;
+ private final AtomicInteger aggregated;
+ private final AtomicInteger total;
+ private final AtomicBoolean timedOut;
+
+ private ParallelAggregateTimeoutTask(Exchange original, AtomicExchange result, CompletionService<Exchange> completion,
+ AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) {
+ this.original = original;
+ this.result = result;
+ this.completion = completion;
+ this.aggregated = aggregated;
+ this.total = total;
+ this.timedOut = timedOut;
+ }
+
+ @Override
+ public void run() {
+ AggregationStrategy strategy = getAggregationStrategy(null);
+ if (strategy instanceof TimeoutAwareAggregationStrategy) {
+ // notify the strategy we timed out
+ Exchange oldExchange = result.get();
+ if (oldExchange == null) {
+ // if they all timed out the result may not have been set yet, so use the original exchange
+ oldExchange = original;
+ }
+ ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout);
+ } else {
+ // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
+ LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue());
+ }
+ LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue());
+ timedOut.set(true);
+
+ // mark that index as timed out, which allows us to try to retrieve
+ // any already completed tasks in the next loop
+ if (completion instanceof SubmitOrderedCompletionService) {
+ ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
+ }
+
+ // we timed out so increment the counter
+ aggregated.incrementAndGet();
+ }
+ }
+
protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
AtomicInteger total = new AtomicInteger();
Iterator<ProcessorExchangePair> it = pairs.iterator();