You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/23 14:49:32 UTC

svn commit: r957176 - /camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

Author: davsclaus
Date: Wed Jun 23 12:49:32 2010
New Revision: 957176

URL: http://svn.apache.org/viewvc?rev=957176&view=rev
Log:
CAMEL-2838: Multicast, Recipient List and Splitter EIPs now support async routing engine.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=957176&r1=957175&r2=957176&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Jun 23 12:49:32 2010
@@ -162,14 +162,14 @@ public class MulticastProcessor extends 
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        boolean sync = true;
-
         final AtomicExchange result = new AtomicExchange();
         final Iterable<ProcessorExchangePair> pairs;
 
         // multicast uses fine grained error handling on the output processors
         // so use try .. catch to cater for this
         try {
+            boolean sync = true;
+
             pairs = createProcessorExchangePairs(exchange);
             if (isParallelProcessing()) {
                 // ensure an executor is set when running in parallel
@@ -184,23 +184,17 @@ public class MulticastProcessor extends 
                 // so we break out now, then the callback will be invoked which then continue routing from where we left here
                 return false;
             }
-
-            // copy results back to the original exchange
-            if (result.get() != null) {
-                ExchangeHelper.copyResults(exchange, result.get());
-            }
         } catch (Throwable e) {
-            // multicast uses error handling on its output processors and they have tried to redeliver
-            // so we shall signal back to the other error handlers that we are exhausted and they should not
-            // also try to redeliver as we will then do that twice
-            exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
             exchange.setException(e);
+            // and do the done work
+            doDone(exchange, null, callback, true);
+            return true;
         }
 
-        // cleanup any per exchange aggregation strategy
-        exchange.removeProperty(Exchange.AGGREGATION_STRATEGY);
-
-        callback.done(true);
+        // multicasting was processed successfully
+        // and do the done work
+        Exchange subExchange = result.get() != null ? result.get() : null;
+        doDone(exchange, subExchange, callback, true);
         return true;
     }
 
@@ -347,15 +341,8 @@ public class MulticastProcessor extends 
                     if (stopOnException && subExchange.getException() != null) {
                         // wrap in exception to explain where it failed
                         subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
-                        // multicast uses error handling on its output processors and they have tried to redeliver
-                        // so we shall signal back to the other error handlers that we are exhausted and they should not
-                        // also try to redeliver as we will then do that twice
-                        exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
-                        // and copy the current result to original so it will contain this exception
-                        // cleanup any per exchange aggregation strategy
-                        original.removeProperty(Exchange.AGGREGATION_STRATEGY);
-                        ExchangeHelper.copyResults(original, subExchange);
-                        callback.done(false);
+                        // and do the done work
+                        doDone(original, subExchange, callback, false);
                         return;
                     }
 
@@ -364,15 +351,8 @@ public class MulticastProcessor extends 
                     } catch (Throwable e) {
                         // wrap in exception to explain where it failed
                         subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
-                        // multicast uses error handling on its output processors and they have tried to redeliver
-                        // so we shall signal back to the other error handlers that we are exhausted and they should not
-                        // also try to redeliver as we will then do that twice
-                        original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
-                        // cleanup any per exchange aggregation strategy
-                        original.removeProperty(Exchange.AGGREGATION_STRATEGY);
-                        // and copy the current result to original so it will contain this exception
-                        ExchangeHelper.copyResults(original, subExchange);
-                        callback.done(false);
+                        // and do the done work
+                        doDone(original, subExchange, callback, false);
                         return;
                     }
 
@@ -397,15 +377,8 @@ public class MulticastProcessor extends 
                         if (stopOnException && subExchange.getException() != null) {
                             // wrap in exception to explain where it failed
                             subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
-                            // multicast uses error handling on its output processors and they have tried to redeliver
-                            // so we shall signal back to the other error handlers that we are exhausted and they should not
-                            // also try to redeliver as we will then do that twice
-                            original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
-                            // cleanup any per exchange aggregation strategy
-                            original.removeProperty(Exchange.AGGREGATION_STRATEGY);
-                            // and copy the current result to original so it will contain this exception
-                            ExchangeHelper.copyResults(original, subExchange);
-                            callback.done(false);
+                            // and do the done work
+                            doDone(original, subExchange, callback, false);
                             return;
                         }
 
@@ -414,28 +387,17 @@ public class MulticastProcessor extends 
                         } catch (Throwable e) {
                             // wrap in exception to explain where it failed
                             subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
-                            // multicast uses error handling on its output processors and they have tried to redeliver
-                            // so we shall signal back to the other error handlers that we are exhausted and they should not
-                            // also try to redeliver as we will then do that twice
-                            original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
-                            // cleanup any per exchange aggregation strategy
-                            original.removeProperty(Exchange.AGGREGATION_STRATEGY);
-                            // and copy the current result to original so it will contain this exception
-                            ExchangeHelper.copyResults(original, subExchange);
-                            callback.done(false);
+                            // and do the done work
+                            doDone(original, subExchange, callback, false);
                             return;
                         }
 
                         total.incrementAndGet();
                     }
 
-                    // cleanup any per exchange aggregation strategy
-                    original.removeProperty(Exchange.AGGREGATION_STRATEGY);
-                    // multicasting complete so copy results back to the original exchange
-                    if (result.get() != null) {
-                        ExchangeHelper.copyResults(original, result.get());
-                    }
-                    callback.done(false);
+                    // do the done work
+                    subExchange = result.get() != null ? result.get() : null;
+                    doDone(original, subExchange, callback, false);
                 }
             });
         } finally {
@@ -455,6 +417,34 @@ public class MulticastProcessor extends 
     }
 
     /**
+     * Common work which must be done when we are done multicasting.
+     * <p/>
+     * This logic applies for both running synchronous and asynchronous as there are multiple exist points
+     * when using the asynchronous routing engine. And therefore we want the logic in one method instead
+     * of being scattered.
+     *
+     * @param original      the original exchange
+     * @param subExchange   the current sub exchange, can be <tt>null</tt> for the synchronous part
+     * @param callback      the callback
+     * @param doneSync      the <tt>doneSync</tt> parameter to call on callback
+     */
+    protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync) {
+        // cleanup any per exchange aggregation strategy
+        original.removeProperty(Exchange.AGGREGATION_STRATEGY);
+        if (original.getException() != null) {
+            // multicast uses error handling on its output processors and they have tried to redeliver
+            // so we shall signal back to the other error handlers that we are exhausted and they should not
+            // also try to redeliver as we will then do that twice
+            original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+        }
+        if (subExchange != null) {
+            // and copy the current result to original so it will contain this exception
+            ExchangeHelper.copyResults(original, subExchange);
+        }
+        callback.done(doneSync);
+    }
+
+    /**
      * Aggregate the {@link Exchange} with the current result
      *
      * @param strategy the aggregation strategy to use