You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/11 10:12:34 UTC

svn commit: r1200781 - in /camel/branches/camel-2.8.x: ./ camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Author: davsclaus
Date: Fri Nov 11 09:12:33 2011
New Revision: 1200781

URL: http://svn.apache.org/viewvc?rev=1200781&view=rev
Log:
CAMEL-4660: Aggregate EIP should not use 1s as the first timeout, but instead the configured value. Thanks to Ole Hofstad for the patch.

Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 09:12:33 2011
@@ -1 +1 @@
-/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338,1198340,1199123,1199137,1199654,1199683,1199703,1199739,1199804,1200214
+/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338,1198340,1199123,1199137,1199654,1199683,1199703,1199739,1199804,1200214,1200771

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1200781&r1=1200780&r2=1200781&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Nov 11 09:12:33 2011
@@ -844,7 +844,7 @@ public class AggregateProcessor extends 
             LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
             ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
             // trigger completion based on interval
-            scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS);
+            scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
         }
 
         // start timeout service if its in use

Modified: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1200781&r1=1200780&r2=1200781&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Fri Nov 11 09:12:33 2011
@@ -279,6 +279,52 @@ public class AggregateProcessorTest exte
 
         ap.stop();
     }
+    
+    public void testAggregateInitialCompletionInterval() throws Exception {
+        // camel context must be started
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+B", "C+D");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        ap.setCompletionInterval(2000);
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+        e2.getIn().setHeader("id", 123);
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("C");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("D");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+        
+        Thread.sleep(1500L);
+        ap.process(e2);
+        
+        Thread.sleep(500L);
+        ap.process(e3);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
 
     public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");