You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/01/19 17:18:04 UTC
svn commit: r1435580 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel/processor:
Enricher.java PollEnricher.java
Author: davsclaus
Date: Sat Jan 19 16:18:03 2013
New Revision: 1435580
URL: http://svn.apache.org/viewvc?rev=1435580&view=rev
Log:
CAMEL-5981: Fixed enrich eip may call async callback twice
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1435580&r1=1435579&r2=1435580&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Sat Jan 19 16:18:03 2013
@@ -123,12 +123,11 @@ public class Enricher extends ServiceSup
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
-
- // prepare the exchanges for aggregation
- ExchangeHelper.prepareAggregation(exchange, resourceExchange);
- Exchange aggregatedExchange;
try {
- aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+
+ Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
@@ -137,6 +136,8 @@ public class Enricher extends ServiceSup
// if the aggregationStrategy threw an exception, set it on the original exchange
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
callback.done(false);
+ // we failed so break out now
+ return;
}
}
@@ -162,21 +163,22 @@ public class Enricher extends ServiceSup
} else {
prepareResult(exchange);
- // prepare the exchanges for aggregation
- ExchangeHelper.prepareAggregation(exchange, resourceExchange);
- // must catch any exception from aggregation
- Exchange aggregatedExchange;
try {
- aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+
+ Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ if (aggregatedExchange != null) {
+ // copy aggregation result onto original exchange (preserving pattern)
+ copyResultsPreservePattern(exchange, aggregatedExchange);
+ }
} catch (Throwable e) {
+ // if the aggregationStrategy threw an exception, set it on the original exchange
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
callback.done(true);
+ // we failed so break out now
return true;
}
- if (aggregatedExchange != null) {
- // copy aggregation result onto original exchange (preserving pattern)
- copyResultsPreservePattern(exchange, aggregatedExchange);
- }
}
// set property with the uri of the endpoint enriched so we can use that for tracing etc
@@ -196,7 +198,8 @@ public class Enricher extends ServiceSup
* @return created exchange.
*/
protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
- Exchange target = source.copy();
+ // copy exchange, and do not share the unit of work
+ Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
target.setPattern(pattern);
return target;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1435580&r1=1435579&r2=1435580&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java Sat Jan 19 16:18:03 2013
@@ -139,23 +139,22 @@ public class PollEnricher extends Servic
} else {
prepareResult(exchange);
- // prepare the exchanges for aggregation
- ExchangeHelper.prepareAggregation(exchange, resourceExchange);
- // must catch any exception from aggregation
- Exchange aggregatedExchange;
try {
- aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+ // must catch any exception from aggregation
+ Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ if (aggregatedExchange != null) {
+ // copy aggregation result onto original exchange (preserving pattern)
+ copyResultsPreservePattern(exchange, aggregatedExchange);
+ // handover any synchronization
+ if (resourceExchange != null) {
+ resourceExchange.handoverCompletions(exchange);
+ }
+ }
} catch (Throwable e) {
throw new CamelExchangeException("Error occurred during aggregation", exchange, e);
}
- if (aggregatedExchange != null) {
- // copy aggregation result onto original exchange (preserving pattern)
- copyResultsPreservePattern(exchange, aggregatedExchange);
- // handover any synchronization
- if (resourceExchange != null) {
- resourceExchange.handoverCompletions(exchange);
- }
- }
}
// set header with the uri of the endpoint enriched so we can use that for tracing etc