You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/07/05 12:57:32 UTC
[camel] branch camel-2.22.x updated: CAMEL-12589: Fixed aggregator
eip to not propgate control headers for force completion so another
aggregator will not also force again.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.22.x by this push:
new 75a0aec CAMEL-12589: Fixed aggregator eip to not propgate control headers for force completion so another aggregator will not also force again.
75a0aec is described below
commit 75a0aec3abd1f7d6f0e46ed0695444ba4c2e75b0
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jul 5 14:56:38 2018 +0200
CAMEL-12589: Fixed aggregator eip to not propgate control headers for force completion so another aggregator will not also force again.
---
.../camel/processor/aggregate/AggregateProcessor.java | 17 ++++++++++++++++-
.../aggregator/AggregateForceCompletionHeaderTest.java | 2 ++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 849687f..f671941 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -285,6 +285,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
//check for the special header to force completion of all groups (and ignore the exchange otherwise)
boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
if (completeAllGroups) {
+ // remove the header so we do not complete again
+ exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
forceCompletionOfAllGroups();
return;
}
@@ -316,6 +318,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
// copy exchange, and do not share the unit of work
// the aggregated output runs in another unit of work
Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+
+ // remove the complete all groups headers as it should not be on the copy
+ copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+ copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+ copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+
try {
aggregated = doAggregation(key, copy);
exhaustedRetries = false;
@@ -341,10 +349,15 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
// the aggregated output runs in another unit of work
Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+ // remove the complete all groups headers as it should not be on the copy
+ copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+ copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+ copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+
// when memory based then its fast using synchronized, but if the aggregation repository is IO
// bound such as JPA etc then concurrent aggregation per correlation key could
// improve performance as we can run aggregation repository get/add in parallel
- List<Exchange> aggregated = null;
+ List<Exchange> aggregated;
lock.lock();
try {
aggregated = doAggregation(key, copy);
@@ -362,6 +375,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
// check for the special header to force completion of all groups (inclusive of the message)
boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
if (completeAllGroupsInclusive) {
+ // remove the header so we do not complete again
+ exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
forceCompletionOfAllGroups();
}
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index 123721d..11e1c3a9 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -45,6 +45,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(2);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
+ getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -67,6 +68,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(3);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
+ getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE).isNull();
//now send a message to trigger completion of all groups, message should be aggregated
Map<String, Object> headers = new HashMap<>();