You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/10/08 05:30:25 UTC
[4/8] camel git commit: CAMEL-9193: Aggregator in preCompletion mode
should also timeout if the new group does not receive further messages
CAMEL-9193: Aggregator in preCompletion mode should also timeout if the new group does not receive further messages
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/925265c4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/925265c4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/925265c4
Branch: refs/heads/camel-2.16.x
Commit: 925265c4c3ce251a1bf4d00811b1d0c9e0a42af8
Parents: ff51e64
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Oct 6 10:16:55 2015 +0200
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Oct 8 11:24:32 2015 +0800
----------------------------------------------------------------------
.../processor/aggregate/AggregateProcessor.java | 5 ++++-
...gatePreCompleteAwareStrategyTimeoutTest.java | 22 ++++++++++++++++++++
camel-core/src/test/resources/log4j.properties | 4 ++--
3 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 5c400f6..822e831 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -438,6 +438,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
originalExchange = null;
// and reset the size to 1
size = 1;
+ // make sure to track timeout as we just restart the correlation group when we are in pre completion mode
+ trackTimeout(key, newExchange);
}
// aggregate the exchanges
@@ -495,7 +497,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
batchConsumerCorrelationKeys.clear();
// we have already submitted to completion, so answer should be null
answer = null;
- } else {
+ } else if (answer != null) {
// we are complete for this exchange
answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
answer = onCompletion(key, originalExchange, answer, false);
@@ -650,6 +652,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
if (!fromTimeout && timeoutMap != null) {
// cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
+ LOG.trace("Removing correlation key {} from timeout", key);
timeoutMap.remove(key);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
index abfda10..ffcb03f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
@@ -40,6 +40,28 @@ public class AggregatePreCompleteAwareStrategyTimeoutTest extends ContextTestSup
assertMockEndpointsSatisfied();
}
+ public void testAggregatePreCompleteTimeoutOnlyOneInLastGroup() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E", "X");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testAggregatePreCompleteTimeoutOnlyOneInFirstGroup() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("X");
+
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/camel-core/src/test/resources/log4j.properties b/camel-core/src/test/resources/log4j.properties
index e91928c..f8ecded 100644
--- a/camel-core/src/test/resources/log4j.properties
+++ b/camel-core/src/test/resources/log4j.properties
@@ -23,7 +23,7 @@ log4j.logger.org.apache.camel.customlogger=TRACE, file2
#log4j.logger.org.apache.camel.impl.converter=WARN
#log4j.logger.org.apache.camel.management=DEBUG
-log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+#log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
#log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
#log4j.logger.org.apache.camel.impl.converter=DEBUG
@@ -49,7 +49,7 @@ log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
#log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
#log4j.logger.org.apache.camel.processor.Delayer=TRACE
#log4j.logger.org.apache.camel.processor.Throttler=TRACE
-#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=DEBUG
+#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=TRACE
#log4j.logger.org.apache.camel.impl=TRACE
#log4j.logger.org.apache.camel.util.FileUtil=TRACE
#log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE