You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/01/19 17:25:27 UTC
svn commit: r1435584 - in /camel/branches/camel-2.9.x: ./
camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Author: davsclaus
Date: Sat Jan 19 16:25:26 2013
New Revision: 1435584
URL: http://svn.apache.org/viewvc?rev=1435584&view=rev
Log:
CAMEL-5981: Fixed enrich eip may call async callback twice
Modified:
camel/branches/camel-2.9.x/ (props changed)
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1435580
Merged /camel/branches/camel-2.10.x:r1435581
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1435584&r1=1435583&r2=1435584&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Sat Jan 19 16:25:26 2013
@@ -123,12 +123,11 @@ public class Enricher extends ServiceSup
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
-
- // prepare the exchanges for aggregation
- ExchangeHelper.prepareAggregation(exchange, resourceExchange);
- Exchange aggregatedExchange;
try {
- aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+
+ Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
@@ -137,6 +136,8 @@ public class Enricher extends ServiceSup
// if the aggregationStrategy threw an exception, set it on the original exchange
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
callback.done(false);
+ // we failed so break out now
+ return;
}
}
@@ -162,21 +163,22 @@ public class Enricher extends ServiceSup
} else {
prepareResult(exchange);
- // prepare the exchanges for aggregation
- ExchangeHelper.prepareAggregation(exchange, resourceExchange);
- // must catch any exception from aggregation
- Exchange aggregatedExchange;
try {
- aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+
+ Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ if (aggregatedExchange != null) {
+ // copy aggregation result onto original exchange (preserving pattern)
+ copyResultsPreservePattern(exchange, aggregatedExchange);
+ }
} catch (Throwable e) {
+ // if the aggregationStrategy threw an exception, set it on the original exchange
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
callback.done(true);
+ // we failed so break out now
return true;
}
- if (aggregatedExchange != null) {
- // copy aggregation result onto original exchange (preserving pattern)
- copyResultsPreservePattern(exchange, aggregatedExchange);
- }
}
// set property with the uri of the endpoint enriched so we can use that for tracing etc
@@ -196,7 +198,8 @@ public class Enricher extends ServiceSup
* @return created exchange.
*/
protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
- Exchange target = source.copy();
+ // copy exchange, and do not share the unit of work
+ Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
target.setPattern(pattern);
return target;
}