You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/01/19 17:23:20 UTC

svn commit: r1435581 - in /camel/branches/camel-2.10.x: ./ camel-core/src/main/java/org/apache/camel/processor/Enricher.java

Author: davsclaus
Date: Sat Jan 19 16:23:20 2013
New Revision: 1435581

URL: http://svn.apache.org/viewvc?rev=1435581&view=rev
Log:
CAMEL-5981: Fixed enrich eip may call async callback twice

Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1435580

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1435581&r1=1435580&r2=1435581&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Sat Jan 19 16:23:20 2013
@@ -123,12 +123,11 @@ public class Enricher extends ServiceSup
                     copyResultsPreservePattern(exchange, resourceExchange);
                 } else {
                     prepareResult(exchange);
-
-                    // prepare the exchanges for aggregation
-                    ExchangeHelper.prepareAggregation(exchange, resourceExchange);
-                    Exchange aggregatedExchange;
                     try {
-                        aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+                        // prepare the exchanges for aggregation
+                        ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+
+                        Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
                         if (aggregatedExchange != null) {
                             // copy aggregation result onto original exchange (preserving pattern)
                             copyResultsPreservePattern(exchange, aggregatedExchange);
@@ -137,6 +136,8 @@ public class Enricher extends ServiceSup
                         // if the aggregationStrategy threw an exception, set it on the original exchange
                         exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
                         callback.done(false);
+                        // we failed so break out now
+                        return;
                     }
                 }
 
@@ -162,21 +163,22 @@ public class Enricher extends ServiceSup
         } else {
             prepareResult(exchange);
 
-            // prepare the exchanges for aggregation
-            ExchangeHelper.prepareAggregation(exchange, resourceExchange);
-            // must catch any exception from aggregation
-            Exchange aggregatedExchange;
             try {
-                aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+                // prepare the exchanges for aggregation
+                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+
+                Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+                if (aggregatedExchange != null) {
+                    // copy aggregation result onto original exchange (preserving pattern)
+                    copyResultsPreservePattern(exchange, aggregatedExchange);
+                }
             } catch (Throwable e) {
+                // if the aggregationStrategy threw an exception, set it on the original exchange
                 exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
                 callback.done(true);
+                // we failed so break out now
                 return true;
             }
-            if (aggregatedExchange != null) {
-                // copy aggregation result onto original exchange (preserving pattern)
-                copyResultsPreservePattern(exchange, aggregatedExchange);
-            }
         }
 
         // set property with the uri of the endpoint enriched so we can use that for tracing etc
@@ -196,7 +198,8 @@ public class Enricher extends ServiceSup
      * @return created exchange.
      */
     protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
-        Exchange target = source.copy();
+        // copy exchange, and do not share the unit of work
+        Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
         target.setPattern(pattern);
         return target;
     }