You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/23 14:49:32 UTC
svn commit: r957176 -
/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Author: davsclaus
Date: Wed Jun 23 12:49:32 2010
New Revision: 957176
URL: http://svn.apache.org/viewvc?rev=957176&view=rev
Log:
CAMEL-2838: Multicast, Recipient List and Splitter EIPs now support async routing engine.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=957176&r1=957175&r2=957176&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Jun 23 12:49:32 2010
@@ -162,14 +162,14 @@ public class MulticastProcessor extends
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- boolean sync = true;
-
final AtomicExchange result = new AtomicExchange();
final Iterable<ProcessorExchangePair> pairs;
// multicast uses fine grained error handling on the output processors
// so use try .. catch to cater for this
try {
+ boolean sync = true;
+
pairs = createProcessorExchangePairs(exchange);
if (isParallelProcessing()) {
// ensure an executor is set when running in parallel
@@ -184,23 +184,17 @@ public class MulticastProcessor extends
// so we break out now, then the callback will be invoked which then continue routing from where we left here
return false;
}
-
- // copy results back to the original exchange
- if (result.get() != null) {
- ExchangeHelper.copyResults(exchange, result.get());
- }
} catch (Throwable e) {
- // multicast uses error handling on its output processors and they have tried to redeliver
- // so we shall signal back to the other error handlers that we are exhausted and they should not
- // also try to redeliver as we will then do that twice
- exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
exchange.setException(e);
+ // and do the done work
+ doDone(exchange, null, callback, true);
+ return true;
}
- // cleanup any per exchange aggregation strategy
- exchange.removeProperty(Exchange.AGGREGATION_STRATEGY);
-
- callback.done(true);
+ // multicasting was processed successfully
+ // and do the done work
+ Exchange subExchange = result.get() != null ? result.get() : null;
+ doDone(exchange, subExchange, callback, true);
return true;
}
@@ -347,15 +341,8 @@ public class MulticastProcessor extends
if (stopOnException && subExchange.getException() != null) {
// wrap in exception to explain where it failed
subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
- // multicast uses error handling on its output processors and they have tried to redeliver
- // so we shall signal back to the other error handlers that we are exhausted and they should not
- // also try to redeliver as we will then do that twice
- exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
- // and copy the current result to original so it will contain this exception
- // cleanup any per exchange aggregation strategy
- original.removeProperty(Exchange.AGGREGATION_STRATEGY);
- ExchangeHelper.copyResults(original, subExchange);
- callback.done(false);
+ // and do the done work
+ doDone(original, subExchange, callback, false);
return;
}
@@ -364,15 +351,8 @@ public class MulticastProcessor extends
} catch (Throwable e) {
// wrap in exception to explain where it failed
subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
- // multicast uses error handling on its output processors and they have tried to redeliver
- // so we shall signal back to the other error handlers that we are exhausted and they should not
- // also try to redeliver as we will then do that twice
- original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
- // cleanup any per exchange aggregation strategy
- original.removeProperty(Exchange.AGGREGATION_STRATEGY);
- // and copy the current result to original so it will contain this exception
- ExchangeHelper.copyResults(original, subExchange);
- callback.done(false);
+ // and do the done work
+ doDone(original, subExchange, callback, false);
return;
}
@@ -397,15 +377,8 @@ public class MulticastProcessor extends
if (stopOnException && subExchange.getException() != null) {
// wrap in exception to explain where it failed
subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
- // multicast uses error handling on its output processors and they have tried to redeliver
- // so we shall signal back to the other error handlers that we are exhausted and they should not
- // also try to redeliver as we will then do that twice
- original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
- // cleanup any per exchange aggregation strategy
- original.removeProperty(Exchange.AGGREGATION_STRATEGY);
- // and copy the current result to original so it will contain this exception
- ExchangeHelper.copyResults(original, subExchange);
- callback.done(false);
+ // and do the done work
+ doDone(original, subExchange, callback, false);
return;
}
@@ -414,28 +387,17 @@ public class MulticastProcessor extends
} catch (Throwable e) {
// wrap in exception to explain where it failed
subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
- // multicast uses error handling on its output processors and they have tried to redeliver
- // so we shall signal back to the other error handlers that we are exhausted and they should not
- // also try to redeliver as we will then do that twice
- original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
- // cleanup any per exchange aggregation strategy
- original.removeProperty(Exchange.AGGREGATION_STRATEGY);
- // and copy the current result to original so it will contain this exception
- ExchangeHelper.copyResults(original, subExchange);
- callback.done(false);
+ // and do the done work
+ doDone(original, subExchange, callback, false);
return;
}
total.incrementAndGet();
}
- // cleanup any per exchange aggregation strategy
- original.removeProperty(Exchange.AGGREGATION_STRATEGY);
- // multicasting complete so copy results back to the original exchange
- if (result.get() != null) {
- ExchangeHelper.copyResults(original, result.get());
- }
- callback.done(false);
+ // do the done work
+ subExchange = result.get() != null ? result.get() : null;
+ doDone(original, subExchange, callback, false);
}
});
} finally {
@@ -455,6 +417,34 @@ public class MulticastProcessor extends
}
/**
+ * Common work which must be done when we are done multicasting.
+ * <p/>
+ * This logic applies for both running synchronous and asynchronous as there are multiple exist points
+ * when using the asynchronous routing engine. And therefore we want the logic in one method instead
+ * of being scattered.
+ *
+ * @param original the original exchange
+ * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part
+ * @param callback the callback
+ * @param doneSync the <tt>doneSync</tt> parameter to call on callback
+ */
+ protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync) {
+ // cleanup any per exchange aggregation strategy
+ original.removeProperty(Exchange.AGGREGATION_STRATEGY);
+ if (original.getException() != null) {
+ // multicast uses error handling on its output processors and they have tried to redeliver
+ // so we shall signal back to the other error handlers that we are exhausted and they should not
+ // also try to redeliver as we will then do that twice
+ original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+ }
+ if (subExchange != null) {
+ // and copy the current result to original so it will contain this exception
+ ExchangeHelper.copyResults(original, subExchange);
+ }
+ callback.done(doneSync);
+ }
+
+ /**
* Aggregate the {@link Exchange} with the current result
*
* @param strategy the aggregation strategy to use