You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/10/08 05:30:25 UTC

[4/8] camel git commit: CAMEL-9193: Aggregator in preCompletion mode should also timeout if the new group does not receive further messages

CAMEL-9193: Aggregator in preCompletion mode should also timeout if the new group does not receive further messages


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/925265c4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/925265c4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/925265c4

Branch: refs/heads/camel-2.16.x
Commit: 925265c4c3ce251a1bf4d00811b1d0c9e0a42af8
Parents: ff51e64
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Oct 6 10:16:55 2015 +0200
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Oct 8 11:24:32 2015 +0800

----------------------------------------------------------------------
 .../processor/aggregate/AggregateProcessor.java |  5 ++++-
 ...gatePreCompleteAwareStrategyTimeoutTest.java | 22 ++++++++++++++++++++
 camel-core/src/test/resources/log4j.properties  |  4 ++--
 3 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 5c400f6..822e831 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -438,6 +438,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             originalExchange = null;
             // and reset the size to 1
             size = 1;
+            // make sure to track timeout as we just restart the correlation group when we are in pre completion mode
+            trackTimeout(key, newExchange);
         }
 
         // aggregate the exchanges
@@ -495,7 +497,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             batchConsumerCorrelationKeys.clear();
             // we have already submitted to completion, so answer should be null
             answer = null;
-        } else {
+        } else if (answer != null) {
             // we are complete for this exchange
             answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
             answer = onCompletion(key, originalExchange, answer, false);
@@ -650,6 +652,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
         if (!fromTimeout && timeoutMap != null) {
             // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
+            LOG.trace("Removing correlation key {} from timeout", key);
             timeoutMap.remove(key);
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
index abfda10..ffcb03f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
@@ -40,6 +40,28 @@ public class AggregatePreCompleteAwareStrategyTimeoutTest extends ContextTestSup
         assertMockEndpointsSatisfied();
     }
 
+    public void testAggregatePreCompleteTimeoutOnlyOneInLastGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E", "X");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testAggregatePreCompleteTimeoutOnlyOneInFirstGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("X");
+
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {

http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/camel-core/src/test/resources/log4j.properties b/camel-core/src/test/resources/log4j.properties
index e91928c..f8ecded 100644
--- a/camel-core/src/test/resources/log4j.properties
+++ b/camel-core/src/test/resources/log4j.properties
@@ -23,7 +23,7 @@ log4j.logger.org.apache.camel.customlogger=TRACE, file2
 
 #log4j.logger.org.apache.camel.impl.converter=WARN
 #log4j.logger.org.apache.camel.management=DEBUG
-log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+#log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 #log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
 #log4j.logger.org.apache.camel.impl.converter=DEBUG
 
@@ -49,7 +49,7 @@ log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
 #log4j.logger.org.apache.camel.processor.Delayer=TRACE
 #log4j.logger.org.apache.camel.processor.Throttler=TRACE
-#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=DEBUG
+#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=TRACE
 #log4j.logger.org.apache.camel.impl=TRACE
 #log4j.logger.org.apache.camel.util.FileUtil=TRACE
 #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE