You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/11 10:12:34 UTC
svn commit: r1200781 - in /camel/branches/camel-2.8.x: ./
camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Author: davsclaus
Date: Fri Nov 11 09:12:33 2011
New Revision: 1200781
URL: http://svn.apache.org/viewvc?rev=1200781&view=rev
Log:
CAMEL-4660: Aggregate EIP should not use 1s as the first timeout, but instead the configured value. Thanks to Ole Hofstad for the patch.
Modified:
camel/branches/camel-2.8.x/ (props changed)
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 09:12:33 2011
@@ -1 +1 @@
-/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338,1198340,1199123,1199137,1199654,1199683,1199703,1199739,1199804,1200214
+/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338,1198340,1199123,1199137,1199654,1199683,1199703,1199739,1199804,1200214,1200771
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1200781&r1=1200780&r2=1200781&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Nov 11 09:12:33 2011
@@ -844,7 +844,7 @@ public class AggregateProcessor extends
LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
// trigger completion based on interval
- scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS);
+ scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
}
// start timeout service if its in use
Modified: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1200781&r1=1200780&r2=1200781&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Fri Nov 11 09:12:33 2011
@@ -279,6 +279,52 @@ public class AggregateProcessorTest exte
ap.stop();
}
+
+ public void testAggregateInitialCompletionInterval() throws Exception {
+ // camel context must be started
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("A+B", "C+D");
+ mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+
+ Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+ Expression corr = header("id");
+ AggregationStrategy as = new BodyInAggregatingStrategy();
+
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ ap.setCompletionInterval(2000);
+ ap.start();
+
+ Exchange e1 = new DefaultExchange(context);
+ e1.getIn().setBody("A");
+ e1.getIn().setHeader("id", 123);
+
+ Exchange e2 = new DefaultExchange(context);
+ e2.getIn().setBody("B");
+ e2.getIn().setHeader("id", 123);
+
+ Exchange e3 = new DefaultExchange(context);
+ e3.getIn().setBody("C");
+ e3.getIn().setHeader("id", 123);
+
+ Exchange e4 = new DefaultExchange(context);
+ e4.getIn().setBody("D");
+ e4.getIn().setHeader("id", 123);
+
+ ap.process(e1);
+
+ Thread.sleep(1500L);
+ ap.process(e2);
+
+ Thread.sleep(500L);
+ ap.process(e3);
+ ap.process(e4);
+
+ assertMockEndpointsSatisfied();
+
+ ap.stop();
+ }
public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");