You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/09 11:22:45 UTC

svn commit: r1056903 - in /camel/trunk/camel-core/src/main/java/org/apache/camel: processor/MulticastProcessor.java processor/RecipientList.java util/ServiceHelper.java

Author: davsclaus
Date: Sun Jan  9 10:22:45 2011
New Revision: 1056903

URL: http://svn.apache.org/viewvc?rev=1056903&view=rev
Log:
CAMEL-3497: Fixed subtle issue with aggregation task blocking.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056903&r1=1056902&r2=1056903&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sun Jan  9 10:22:45 2011
@@ -145,7 +145,6 @@ public class MulticastProcessor extends 
     private final boolean streaming;
     private final boolean stopOnException;
     private final ExecutorService executorService;
-    private ExecutorService aggregationExecutorService;
     private final long timeout;
     private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
 
@@ -234,6 +233,8 @@ public class MulticastProcessor extends 
     protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
                                      final boolean streaming, final AsyncCallback callback) throws Exception {
 
+        ObjectHelper.notNull(executorService, "ExecutorService", this);
+
         final CompletionService<Exchange> completion;
         if (streaming) {
             // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
@@ -259,8 +260,8 @@ public class MulticastProcessor extends 
             AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result, original, total, completion, running,
                     aggregationOnTheFlyDone, allTasksSubmitted, executionException);
 
-            // and start the task using the aggregation execution service
-            aggregationExecutorService.submit(task);
+            // and start the aggregation task so we can aggregate on-the-fly
+            executorService.submit(task);
         }
 
         LOG.trace("Starting to submit parallel tasks");
@@ -421,11 +422,15 @@ public class MulticastProcessor extends 
                     }
                     future = completion.poll(left, TimeUnit.MILLISECONDS);
                 } else {
-                    // take will wait until the task is complete
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Polling completion task #" + aggregated);
                     }
-                    future = completion.take();
+                    // we must not block so poll every second
+                    future = completion.poll(1, TimeUnit.SECONDS);
+                    if (future == null) {
+                        // and continue loop which will recheck if we are done
+                        continue;
+                    }
                 }
 
                 if (future == null && timedOut) {
@@ -866,9 +871,6 @@ public class MulticastProcessor extends 
         if (timeout > 0 && !isParallelProcessing()) {
             throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
         }
-        if (isParallelProcessing()) {
-            aggregationExecutorService = getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "AggregationTask");
-        }
         ServiceHelper.startServices(processors);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1056903&r1=1056902&r2=1056903&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Sun Jan  9 10:22:45 2011
@@ -111,6 +111,15 @@ public class RecipientList extends Servi
                                                                 isParallelProcessing(), getExecutorService(), isStreaming(), isStopOnException(), getTimeout());
         rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
 
+        // start the service
+        try {
+            ServiceHelper.startService(rlp);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
         // now let the multicast process the exchange
         return AsyncProcessorHelper.process(rlp, exchange, callback);
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1056903&r1=1056902&r2=1056903&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Sun Jan  9 10:22:45 2011
@@ -59,6 +59,9 @@ public final class ServiceHelper {
      * Starts all of the given services
      */
     public static void startServices(Object... services) throws Exception {
+        if (services == null) {
+            return;
+        }
         for (Object value : services) {
             startService(value);
         }
@@ -68,6 +71,9 @@ public final class ServiceHelper {
      * Starts all of the given services
      */
     public static void startServices(Collection<?> services) throws Exception {
+        if (services == null) {
+            return;
+        }
         for (Object value : services) {
             if (value instanceof Service) {
                 Service service = (Service)value;
@@ -83,6 +89,9 @@ public final class ServiceHelper {
      * Stops all of the given services, throwing the first exception caught
      */
     public static void stopServices(Object... services) throws Exception {
+        if (services == null) {
+            return;
+        }
         List<Object> list = Arrays.asList(services);
         stopServices(list);
     }
@@ -106,6 +115,9 @@ public final class ServiceHelper {
      * Stops all of the given services, throwing the first exception caught
      */
     public static void stopServices(Collection<?> services) throws Exception {
+        if (services == null) {
+            return;
+        }
         Exception firstException = null;
         for (Object value : services) {
             if (value instanceof Service) {
@@ -134,6 +146,9 @@ public final class ServiceHelper {
      * Stops and shutdowns all of the given services, throwing the first exception caught
      */
     public static void stopAndShutdownServices(Object... services) throws Exception {
+        if (services == null) {
+            return;
+        }
         List<Object> list = Arrays.asList(services);
         stopAndShutdownServices(list);
     }
@@ -163,6 +178,9 @@ public final class ServiceHelper {
      * Stops and shutdowns all of the given services, throwing the first exception caught
      */
     public static void stopAndShutdownServices(Collection<?> services) throws Exception {
+        if (services == null) {
+            return;
+        }
         Exception firstException = null;
 
         for (Object value : services) {
@@ -194,6 +212,9 @@ public final class ServiceHelper {
     }
 
     public static void resumeServices(Collection<?> services) throws Exception {
+        if (services == null) {
+            return;
+        }
         Exception firstException = null;
         for (Object value : services) {
             if (value instanceof Service) {
@@ -254,6 +275,9 @@ public final class ServiceHelper {
     }
 
     public static void suspendServices(Collection<?> services) throws Exception {
+        if (services == null) {
+            return;
+        }
         Exception firstException = null;
         for (Object value : services) {
             if (value instanceof Service) {