You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/07/05 12:57:35 UTC

[camel] branch camel-2.21.x updated: CAMEL-12589: Fixed aggregator eip to not propgate control headers for force completion so another aggregator will not also force again.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.21.x by this push:
     new 0475ddc  CAMEL-12589: Fixed aggregator eip to not propgate control headers for force completion so another aggregator will not also force again.
0475ddc is described below

commit 0475ddc26c4e92780b6bc0585bf8f83a30810618
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jul 5 14:56:38 2018 +0200

    CAMEL-12589: Fixed aggregator eip to not propgate control headers for force completion so another aggregator will not also force again.
---
 .../camel/processor/aggregate/AggregateProcessor.java   | 17 ++++++++++++++++-
 .../aggregator/AggregateForceCompletionHeaderTest.java  |  2 ++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 3c4460b..a1e8c4a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -285,6 +285,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         //check for the special header to force completion of all groups (and ignore the exchange otherwise)
         boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
         if (completeAllGroups) {
+            // remove the header so we do not complete again
+            exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
             forceCompletionOfAllGroups();
             return;
         }
@@ -316,6 +318,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                 // copy exchange, and do not share the unit of work
                 // the aggregated output runs in another unit of work
                 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+
+                // remove the complete all groups headers as it should not be on the copy
+                copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+                copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+                copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+
                 try {
                     aggregated = doAggregation(key, copy);
                     exhaustedRetries = false;
@@ -341,10 +349,15 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             // the aggregated output runs in another unit of work
             Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
 
+            // remove the complete all groups headers as it should not be on the copy
+            copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+            copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+            copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+
             // when memory based then its fast using synchronized, but if the aggregation repository is IO
             // bound such as JPA etc then concurrent aggregation per correlation key could
             // improve performance as we can run aggregation repository get/add in parallel
-            List<Exchange> aggregated = null;
+            List<Exchange> aggregated;
             lock.lock();
             try {
                 aggregated = doAggregation(key, copy);
@@ -362,6 +375,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         // check for the special header to force completion of all groups (inclusive of the message)
         boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
         if (completeAllGroupsInclusive) {
+            // remove the header so we do not complete again
+            exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
             forceCompletionOfAllGroups();
         }
     }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index 2971825..f5b2546 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -45,6 +45,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
         getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
+        getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -67,6 +68,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
         getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
+        getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE).isNull();
 
         //now send a message to trigger completion of all groups, message should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();