You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/14 11:37:00 UTC
svn commit: r1058930 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
Author: davsclaus
Date: Fri Jan 14 10:36:59 2011
New Revision: 1058930
URL: http://svn.apache.org/viewvc?rev=1058930&view=rev
Log:
CAMEL-3535: Use a copy of the incoming exchange for aggregation without sharing UoW. This ensures that on compleiton on the completed exchange always will be executed. This ensures the aggregation repository will always be confirmed, and the internal in progress map will have the completed exchange removed as well, preventing it from eating memory over time.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1058930&r1=1058929&r2=1058930&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Jan 14 10:36:59 2011
@@ -177,12 +177,16 @@ public class AggregateProcessor extends
throw new ClosedCorrelationKeyException(key, exchange);
}
+ // copy exchange, and do not share the unit of work
+ // the aggregated output runs in another unit of work
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+
// when memory based then its fast using synchronized, but if the aggregation repository is IO
// bound such as JPA etc then concurrent aggregation per correlation key could
// improve performance as we can run aggregation repository get/add in parallel
lock.lock();
try {
- doAggregation(key, exchange);
+ doAggregation(key, copy);
} finally {
lock.unlock();
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java?rev=1058930&r1=1058929&r2=1058930&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java Fri Jan 14 10:36:59 2011
@@ -16,22 +16,37 @@
*/
package org.apache.camel.processor.aggregator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
/**
* @version $Revision$
*/
public class BeanBeforeAggregateIssueTest extends ContextTestSupport {
+ private MyAggRepo myRepo = new MyAggRepo();
+
public void testBeanBeforeAggregation() throws Exception {
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(3).create();
+
getMockEndpoint("mock:result").expectedBodiesReceived("A+B");
template.sendBody("seda:start", "A");
template.sendBody("seda:start", "B");
assertMockEndpointsSatisfied();
+
+ // wait for all exchanges to be done (2 input + 1 aggregated)
+ notify.matches(5, TimeUnit.SECONDS);
+
+ // should have confirmed
+ assertTrue("Should have confirmed", myRepo.isConfirm());
}
@Override
@@ -42,6 +57,7 @@ public class BeanBeforeAggregateIssueTes
from("seda:start")
.bean(TestBean.class)
.aggregate(constant("true"), new BodyInAggregatingStrategy())
+ .aggregationRepository(myRepo)
.completionSize(2)
.to("mock:result");
}
@@ -54,4 +70,20 @@ public class BeanBeforeAggregateIssueTes
return foo;
}
}
+
+ private final class MyAggRepo extends MemoryAggregationRepository {
+
+ private volatile boolean confirm;
+
+ @Override
+ public void confirm(CamelContext camelContext, String exchangeId) {
+ // test that confirm is invoked
+ super.confirm(camelContext, exchangeId);
+ confirm = true;
+ }
+
+ public boolean isConfirm() {
+ return confirm;
+ }
+ }
}