You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/28 20:33:52 UTC

[2/2] camel git commit: CAMEL-8713: ParallelAggregate option when using parallel mode does not run in parallel

CAMEL-8713: ParallelAggregate option when using parallel mode does not run in parallel


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/943b555d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/943b555d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/943b555d

Branch: refs/heads/camel-2.15.x
Commit: 943b555d62b64817d829e25e5067e27559fbe6e1
Parents: fa726cd
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 28 17:17:28 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 28 20:37:22 2015 +0200

----------------------------------------------------------------------
 .../camel/processor/MulticastProcessor.java     | 124 ++++++++++++++-----
 1 file changed, 93 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/943b555d/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index b953d08..29d0a8b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -352,7 +352,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     /**
-     * Task to aggregate on-the-fly for completed tasks when using parallel processing.
+     * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing.
      * <p/>
      * This ensures lower memory consumption as we do not need to keep all completed tasks in memory
      * before we perform aggregation. Instead this separate thread will run and aggregate when new
@@ -407,21 +407,21 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         }
 
         private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
-            boolean timedOut = false;
+            final AtomicBoolean timedOut = new AtomicBoolean();
             boolean stoppedOnException = false;
             final StopWatch watch = new StopWatch();
-            int aggregated = 0;
+            final AtomicInteger aggregated = new AtomicInteger();
             boolean done = false;
             // not a for loop as on the fly may still run
             while (!done) {
                 // check if we have already aggregate everything
-                if (allTasksSubmitted.get() && aggregated >= total.get()) {
+                if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) {
                     LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
                     break;
                 }
 
                 Future<Exchange> future;
-                if (timedOut) {
+                if (timedOut.get()) {
                     // we are timed out but try to grab if some tasks has been completed
                     // poll will return null if no tasks is present
                     future = completion.poll();
@@ -444,27 +444,12 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                 }
 
                 if (future == null) {
-                    // timeout occurred
-                    AggregationStrategy strategy = getAggregationStrategy(null);
-                    if (strategy instanceof TimeoutAwareAggregationStrategy) {
-                        // notify the strategy we timed out
-                        Exchange oldExchange = result.get();
-                        if (oldExchange == null) {
-                            // if they all timed out the result may not have been set yet, so use the original exchange
-                            oldExchange = original;
-                        }
-                        ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout);
+                    ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut);
+                    if (parallelAggregate) {
+                        aggregateExecutorService.submit(task);
                     } else {
-                        // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
-                        LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
-                    }
-                    LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
-                    timedOut = true;
-
-                    // mark that index as timed out, which allows us to try to retrieve
-                    // any already completed tasks in the next loop
-                    if (completion instanceof SubmitOrderedCompletionService) {
-                        ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
+                        // in non parallel mode then just run the task
+                        task.run();
                     }
                 } else {
                     // there is a result to aggregate
@@ -483,18 +468,18 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                     }
 
                     // we got a result so aggregate it
+                    ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated);
                     if (parallelAggregate) {
-                        doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
+                        aggregateExecutorService.submit(task);
                     } else {
-                        doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+                        // in non parallel mode then just run the task
+                        task.run();
                     }
                 }
-
-                aggregated++;
             }
 
-            if (timedOut || stoppedOnException) {
-                if (timedOut) {
+            if (timedOut.get() || stoppedOnException) {
+                if (timedOut.get()) {
                     LOG.debug("Cancelling tasks due timeout after {} millis.", timeout);
                 }
                 if (stoppedOnException) {
@@ -506,6 +491,83 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         }
     }
 
+    /**
+     * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
+     */
+    private final class ParallelAggregateTask implements Runnable {
+
+        private final AtomicExchange result;
+        private final Exchange subExchange;
+        private final AtomicInteger aggregated;
+
+        private ParallelAggregateTask(AtomicExchange result, Exchange subExchange, AtomicInteger aggregated) {
+            this.result = result;
+            this.subExchange = subExchange;
+            this.aggregated = aggregated;
+        }
+
+        @Override
+        public void run() {
+            if (parallelAggregate) {
+                doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
+            } else {
+                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+            }
+            aggregated.incrementAndGet();
+        }
+    }
+
+    /**
+     * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
+     */
+    private final class ParallelAggregateTimeoutTask implements Runnable {
+
+        private final Exchange original;
+        private final AtomicExchange result;
+        private final CompletionService<Exchange> completion;
+        private final AtomicInteger aggregated;
+        private final AtomicInteger total;
+        private final AtomicBoolean timedOut;
+
+        private ParallelAggregateTimeoutTask(Exchange original, AtomicExchange result, CompletionService<Exchange> completion,
+                                             AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) {
+            this.original = original;
+            this.result = result;
+            this.completion = completion;
+            this.aggregated = aggregated;
+            this.total = total;
+            this.timedOut = timedOut;
+        }
+
+        @Override
+        public void run() {
+            AggregationStrategy strategy = getAggregationStrategy(null);
+            if (strategy instanceof TimeoutAwareAggregationStrategy) {
+                // notify the strategy we timed out
+                Exchange oldExchange = result.get();
+                if (oldExchange == null) {
+                    // if they all timed out the result may not have been set yet, so use the original exchange
+                    oldExchange = original;
+                }
+                ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout);
+            } else {
+                // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
+                LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue());
+            }
+            LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue());
+            timedOut.set(true);
+
+            // mark that index as timed out, which allows us to try to retrieve
+            // any already completed tasks in the next loop
+            if (completion instanceof SubmitOrderedCompletionService) {
+                ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
+            }
+
+            // we timed out so increment the counter
+            aggregated.incrementAndGet();
+        }
+    }
+
     protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
         AtomicInteger total = new AtomicInteger();
         Iterator<ProcessorExchangePair> it = pairs.iterator();