You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/14 11:37:00 UTC

svn commit: r1058930 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java

Author: davsclaus
Date: Fri Jan 14 10:36:59 2011
New Revision: 1058930

URL: http://svn.apache.org/viewvc?rev=1058930&view=rev
Log:
CAMEL-3535: Use a copy of the incoming exchange for aggregation without sharing UoW. This ensures that on compleiton on the completed exchange always will be executed. This ensures the aggregation repository will always be confirmed, and the internal in progress map will have the completed exchange removed as well, preventing it from eating memory over time.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1058930&r1=1058929&r2=1058930&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Jan 14 10:36:59 2011
@@ -177,12 +177,16 @@ public class AggregateProcessor extends 
             throw new ClosedCorrelationKeyException(key, exchange);
         }
 
+        // copy exchange, and do not share the unit of work
+        // the aggregated output runs in another unit of work
+        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+
         // when memory based then its fast using synchronized, but if the aggregation repository is IO
         // bound such as JPA etc then concurrent aggregation per correlation key could
         // improve performance as we can run aggregation repository get/add in parallel
         lock.lock();
         try {
-            doAggregation(key, exchange);
+            doAggregation(key, copy);
         } finally {
             lock.unlock();
         }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java?rev=1058930&r1=1058929&r2=1058930&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java Fri Jan 14 10:36:59 2011
@@ -16,22 +16,37 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
 
 /**
  * @version $Revision$
  */
 public class BeanBeforeAggregateIssueTest extends ContextTestSupport {
 
+    private MyAggRepo myRepo = new MyAggRepo();
+
     public void testBeanBeforeAggregation() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(3).create();
+
         getMockEndpoint("mock:result").expectedBodiesReceived("A+B");
 
         template.sendBody("seda:start", "A");
         template.sendBody("seda:start", "B");
 
         assertMockEndpointsSatisfied();
+
+        // wait for all exchanges to be done (2 input + 1 aggregated)
+        notify.matches(5, TimeUnit.SECONDS);
+
+        // should have confirmed
+        assertTrue("Should have confirmed", myRepo.isConfirm());
     }
 
     @Override
@@ -42,6 +57,7 @@ public class BeanBeforeAggregateIssueTes
                 from("seda:start")
                     .bean(TestBean.class)
                     .aggregate(constant("true"), new BodyInAggregatingStrategy())
+                        .aggregationRepository(myRepo)
                         .completionSize(2)
                         .to("mock:result");
             }
@@ -54,4 +70,20 @@ public class BeanBeforeAggregateIssueTes
             return foo;
         }
     }
+
+    private final class MyAggRepo extends MemoryAggregationRepository {
+
+        private volatile boolean confirm;
+
+        @Override
+        public void confirm(CamelContext camelContext, String exchangeId) {
+            // test that confirm is invoked
+            super.confirm(camelContext, exchangeId);
+            confirm = true;
+        }
+
+        public boolean isConfirm() {
+            return confirm;
+        }
+    }
 }