You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/11 09:49:59 UTC
svn commit: r1200771 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Author: davsclaus
Date: Fri Nov 11 08:49:59 2011
New Revision: 1200771
URL: http://svn.apache.org/viewvc?rev=1200771&view=rev
Log:
CAMEL-4660: Aggregate EIP should not use 1s as the first timeout, but instead the configured value. Thanks to Ole Hofstad for the patch.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1200771&r1=1200770&r2=1200771&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Nov 11 08:49:59 2011
@@ -859,7 +859,7 @@ public class AggregateProcessor extends
setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
}
// trigger completion based on interval
- getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS);
+ getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
}
// start timeout service if its in use
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1200771&r1=1200770&r2=1200771&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Fri Nov 11 08:49:59 2011
@@ -279,6 +279,52 @@ public class AggregateProcessorTest exte
ap.stop();
}
+
+ public void testAggregateInitialCompletionInterval() throws Exception {
+ // camel context must be started
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("A+B", "C+D");
+ mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+
+ Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+ Expression corr = header("id");
+ AggregationStrategy as = new BodyInAggregatingStrategy();
+
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ ap.setCompletionInterval(2000);
+ ap.start();
+
+ Exchange e1 = new DefaultExchange(context);
+ e1.getIn().setBody("A");
+ e1.getIn().setHeader("id", 123);
+
+ Exchange e2 = new DefaultExchange(context);
+ e2.getIn().setBody("B");
+ e2.getIn().setHeader("id", 123);
+
+ Exchange e3 = new DefaultExchange(context);
+ e3.getIn().setBody("C");
+ e3.getIn().setHeader("id", 123);
+
+ Exchange e4 = new DefaultExchange(context);
+ e4.getIn().setBody("D");
+ e4.getIn().setHeader("id", 123);
+
+ ap.process(e1);
+
+ Thread.sleep(1500L);
+ ap.process(e2);
+
+ Thread.sleep(500L);
+ ap.process(e3);
+ ap.process(e4);
+
+ assertMockEndpointsSatisfied();
+
+ ap.stop();
+ }
public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");