You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/25 11:03:49 UTC

[camel] branch camel-2.25.x updated: back port fix for group aggregation for Camel 2.25.x (#3514)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.25.x by this push:
     new 3bf5aed  back port fix for group aggregation for Camel 2.25.x  (#3514)
3bf5aed is described below

commit 3bf5aed38f0cfd22711cd0ce9f08619eee5e41ad
Author: grigoni <gi...@gmail.com>
AuthorDate: Sat Jan 25 12:03:35 2020 +0100

    back port fix for group aggregation for Camel 2.25.x  (#3514)
    
    * CAMEL-14414: for aggregation group completion setting and removal is handled through methods to enforce code cohesion and avoid repetitions. fixed issue for CURRENT_GROUP and ALL_GROUPS flag removal. adapted documentation accordingly
    
    * CAMEL-14414: adapted group aggregation related test to match expectation that group flags are cleaned up after processing
    
    * CAMEL-14414: polish
---
 camel-core/src/main/docs/eips/aggregate-eip.adoc   |  2 +-
 .../processor/aggregate/AggregateProcessor.java    | 65 +++++++++++++++++-----
 ...eCompletionHeaderInAggregationStrategyTest.java |  2 +
 .../AggregateForceCompletionHeaderTest.java        |  3 +-
 .../AggregationStrategyCompleteByPropertyTest.java |  8 +++
 .../HawtDBAggregateForceCompletionHeaderTest.java  |  2 +-
 .../LevelDBAggregateForceCompletionHeaderTest.java |  2 +-
 .../JdbcAggregateForceCompletionHeaderTest.java    |  2 +-
 8 files changed, 66 insertions(+), 20 deletions(-)

diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index f7f2699..900350a 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -286,7 +286,7 @@ setting the property `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` to `true`.
 *Available as of Camel 2.9*
 
 You can manually trigger completion of all current aggregated exchanges
-by sending a message containing the header
+by sending an exchange containing the property
 `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` set to `true`. The message is
 considered a signal message only, the message headers/contents will not
 be processed otherwise.
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index c32d34a..41971cf 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -283,10 +283,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         }
 
         //check for the special header to force completion of all groups (and ignore the exchange otherwise)
-        boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
+        boolean completeAllGroups = isCompleteAllGroups(exchange);
         if (completeAllGroups) {
             // remove the header so we do not complete again
-            exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+            removeFlagCompleteAllGroups(exchange);
             forceCompletionOfAllGroups();
             return;
         }
@@ -320,9 +320,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
 
                 // remove the complete all groups headers as it should not be on the copy
-                copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
-                copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
-                copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+                removeFlagCompleteCurrentGroup(copy);
+                removeFlagCompleteAllGroups(copy);
+                removeFlagCompleteAllGroupsInclusive(copy);
 
                 try {
                     aggregated = doAggregation(key, copy);
@@ -350,9 +350,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
 
             // remove the complete all groups headers as it should not be on the copy
-            copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
-            copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
-            copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+            removeFlagCompleteCurrentGroup(copy);
+            removeFlagCompleteAllGroups(copy);
+            removeFlagCompleteAllGroupsInclusive(copy);
 
             // when memory based then its fast using synchronized, but if the aggregation repository is IO
             // bound such as JPA etc then concurrent aggregation per correlation key could
@@ -372,15 +372,49 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             }
         }
 
-        // check for the special header to force completion of all groups (inclusive of the message)
-        boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
+        // check for the special flag to force completion of all groups (inclusive of the message)
+        boolean completeAllGroupsInclusive = isCompleteAllGroupsInclusive(exchange);
         if (completeAllGroupsInclusive) {
-            // remove the header so we do not complete again
-            exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+            // remove the flag so we do not complete again
+            removeFlagCompleteAllGroupsInclusive(exchange);
             forceCompletionOfAllGroups();
         }
     }
 
+    private Object removeFlagCompleteCurrentGroup(Exchange exchange) {
+        //before everywhere : return exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+        return exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+    }
+
+    private Boolean isCompleteCurrentGroup(Exchange exchange) {
+        return exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class);
+    }
+
+    private Object removeFlagCompleteAllGroups(Exchange exchange) {
+        Object removedHeader = exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+        Object removedProp = exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+        return removedHeader == null ? removedProp: removedHeader;
+    }
+
+    private Boolean isCompleteAllGroups(Exchange exchange) {
+        boolean retVal;
+        retVal = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
+        if(!retVal) {
+            // according to doc it is a property but it is sometimes read as header
+            // some test don't fail because they use the header expression which contains a fallback to properties
+            retVal = exchange.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
+        }
+        return retVal;
+    }
+
+    private Object removeFlagCompleteAllGroupsInclusive(Exchange exchange) {
+        return exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+    }
+
+    private Boolean isCompleteAllGroupsInclusive(Exchange exchange) {
+        return exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
+    }
+
     /**
      * Aggregates the exchange with the given correlation key
      * <p/>
@@ -473,10 +507,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         }
 
         // check for the special exchange property to force completion of all groups
-        boolean completeAllGroups = answer.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
+        boolean completeAllGroups = isCompleteAllGroups(answer);
         if (completeAllGroups) {
             // remove the exchange property so we do not complete again
-            answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+            removeFlagCompleteAllGroups(answer);
             forceCompletionOfAllGroups();
         } else if (isCompletionOnNewCorrelationGroup() && originalExchange == null) {
             // its a new group so force complete of all existing groups
@@ -613,7 +647,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             }
         }
 
-        if (exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class)) {
+        if (isCompleteCurrentGroup(exchange)) {
+            removeFlagCompleteCurrentGroup(exchange);
             return "strategy";
         }
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
index 8f1a16a..f458492 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
@@ -30,6 +30,8 @@ public class AggregateForceCompletionHeaderInAggregationStrategyTest extends Con
     @Test
     public void testCompletePreviousOnNewGroup() throws Exception {
         getMockEndpoint("mock:aggregated").expectedBodiesReceived("AAA", "BB");
+        getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
+        getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
 
         template.sendBody("direct:start", "A,A,A,B,B");
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index 11e1c3a9..576e806 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -46,9 +46,10 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
         getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
         getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
+        getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
index b69d5bd..dd56568 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
@@ -35,6 +35,14 @@ public class AggregationStrategyCompleteByPropertyTest extends ContextTestSuppor
         result.message(0).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy");
         result.message(1).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy");
 
+        // org.apache.camel.builder.ExpressionBuilder.headerExpression(java.lang.String) is going to property fallback
+        // the test (without the fix) will fail into error:
+        // java.lang.AssertionError: Assertion error at index 0 on mock mock://aggregated with predicate:
+        // header(CamelAggregationCompleteCurrentGroup) is null evaluated as: true is null on Exchange[ID-MacBook-Pro-1578822701664-0-2]
+        getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull();
+        // according to manual
+        getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull();
+
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
diff --git a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
index 8fd453f..db2be47 100644
--- a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
+++ b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
@@ -54,7 +54,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport {
         getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
index 598ae86..4dfc1fb 100644
--- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
+++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
@@ -54,7 +54,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport
         getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
index 5dfb8e1..3df38bb 100644
--- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
+++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
@@ -45,7 +45,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat
         getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }