You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/11 09:49:59 UTC

svn commit: r1200771 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Author: davsclaus
Date: Fri Nov 11 08:49:59 2011
New Revision: 1200771

URL: http://svn.apache.org/viewvc?rev=1200771&view=rev
Log:
CAMEL-4660: Aggregate EIP should not use 1s as the first timeout, but instead the configured value. Thanks to Ole Hofstad for the patch.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1200771&r1=1200770&r2=1200771&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Nov 11 08:49:59 2011
@@ -859,7 +859,7 @@ public class AggregateProcessor extends 
                 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
             }
             // trigger completion based on interval
-            getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS);
+            getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
         }
 
         // start timeout service if its in use

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1200771&r1=1200770&r2=1200771&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Fri Nov 11 08:49:59 2011
@@ -279,6 +279,52 @@ public class AggregateProcessorTest exte
 
         ap.stop();
     }
+    
+    public void testAggregateInitialCompletionInterval() throws Exception {
+        // camel context must be started
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+B", "C+D");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        ap.setCompletionInterval(2000);
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+        e2.getIn().setHeader("id", 123);
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("C");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("D");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+        
+        Thread.sleep(1500L);
+        ap.process(e2);
+        
+        Thread.sleep(500L);
+        ap.process(e3);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
 
     public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");