You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/25 11:03:49 UTC
[camel] branch camel-2.25.x updated: back port fix for group
aggregation for Camel 2.25.x (#3514)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push:
new 3bf5aed back port fix for group aggregation for Camel 2.25.x (#3514)
3bf5aed is described below
commit 3bf5aed38f0cfd22711cd0ce9f08619eee5e41ad
Author: grigoni <gi...@gmail.com>
AuthorDate: Sat Jan 25 12:03:35 2020 +0100
back port fix for group aggregation for Camel 2.25.x (#3514)
* CAMEL-14414: for aggregation group completion setting and removal is handled through methods to enforce code cohesion and avoid repetitions. fixed issue for CURRENT_GROUP and ALL_GROUPS flag removal. adapted documentation accordingly
* CAMEL-14414: adapted group aggregation related test to match expectation that group flags are cleaned up after processing
* CAMEL-14414: polish
---
camel-core/src/main/docs/eips/aggregate-eip.adoc | 2 +-
.../processor/aggregate/AggregateProcessor.java | 65 +++++++++++++++++-----
...eCompletionHeaderInAggregationStrategyTest.java | 2 +
.../AggregateForceCompletionHeaderTest.java | 3 +-
.../AggregationStrategyCompleteByPropertyTest.java | 8 +++
.../HawtDBAggregateForceCompletionHeaderTest.java | 2 +-
.../LevelDBAggregateForceCompletionHeaderTest.java | 2 +-
.../JdbcAggregateForceCompletionHeaderTest.java | 2 +-
8 files changed, 66 insertions(+), 20 deletions(-)
diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index f7f2699..900350a 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -286,7 +286,7 @@ setting the property `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` to `true`.
*Available as of Camel 2.9*
You can manually trigger completion of all current aggregated exchanges
-by sending a message containing the header
+by sending an exchange containing the property
`Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` set to `true`. The message is
considered a signal message only, the message headers/contents will not
be processed otherwise.
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index c32d34a..41971cf 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -283,10 +283,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
//check for the special header to force completion of all groups (and ignore the exchange otherwise)
- boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
+ boolean completeAllGroups = isCompleteAllGroups(exchange);
if (completeAllGroups) {
// remove the header so we do not complete again
- exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+ removeFlagCompleteAllGroups(exchange);
forceCompletionOfAllGroups();
return;
}
@@ -320,9 +320,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
// remove the complete all groups headers as it should not be on the copy
- copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
- copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
- copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+ removeFlagCompleteCurrentGroup(copy);
+ removeFlagCompleteAllGroups(copy);
+ removeFlagCompleteAllGroupsInclusive(copy);
try {
aggregated = doAggregation(key, copy);
@@ -350,9 +350,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
// remove the complete all groups headers as it should not be on the copy
- copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
- copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
- copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+ removeFlagCompleteCurrentGroup(copy);
+ removeFlagCompleteAllGroups(copy);
+ removeFlagCompleteAllGroupsInclusive(copy);
// when memory based then its fast using synchronized, but if the aggregation repository is IO
// bound such as JPA etc then concurrent aggregation per correlation key could
@@ -372,15 +372,49 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
}
- // check for the special header to force completion of all groups (inclusive of the message)
- boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
+ // check for the special flag to force completion of all groups (inclusive of the message)
+ boolean completeAllGroupsInclusive = isCompleteAllGroupsInclusive(exchange);
if (completeAllGroupsInclusive) {
- // remove the header so we do not complete again
- exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+ // remove the flag so we do not complete again
+ removeFlagCompleteAllGroupsInclusive(exchange);
forceCompletionOfAllGroups();
}
}
+ private Object removeFlagCompleteCurrentGroup(Exchange exchange) {
+ //before everywhere : return exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+ return exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+ }
+
+ private Boolean isCompleteCurrentGroup(Exchange exchange) {
+ return exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class);
+ }
+
+ private Object removeFlagCompleteAllGroups(Exchange exchange) {
+ Object removedHeader = exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+ Object removedProp = exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+ return removedHeader == null ? removedProp: removedHeader;
+ }
+
+ private Boolean isCompleteAllGroups(Exchange exchange) {
+ boolean retVal;
+ retVal = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
+ if(!retVal) {
+ // according to doc it is a property but it is sometimes read as header
+ // some test don't fail because they use the header expression which contains a fallback to properties
+ retVal = exchange.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
+ }
+ return retVal;
+ }
+
+ private Object removeFlagCompleteAllGroupsInclusive(Exchange exchange) {
+ return exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+ }
+
+ private Boolean isCompleteAllGroupsInclusive(Exchange exchange) {
+ return exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
+ }
+
/**
* Aggregates the exchange with the given correlation key
* <p/>
@@ -473,10 +507,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
// check for the special exchange property to force completion of all groups
- boolean completeAllGroups = answer.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
+ boolean completeAllGroups = isCompleteAllGroups(answer);
if (completeAllGroups) {
// remove the exchange property so we do not complete again
- answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+ removeFlagCompleteAllGroups(answer);
forceCompletionOfAllGroups();
} else if (isCompletionOnNewCorrelationGroup() && originalExchange == null) {
// its a new group so force complete of all existing groups
@@ -613,7 +647,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
}
- if (exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class)) {
+ if (isCompleteCurrentGroup(exchange)) {
+ removeFlagCompleteCurrentGroup(exchange);
return "strategy";
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
index 8f1a16a..f458492 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
@@ -30,6 +30,8 @@ public class AggregateForceCompletionHeaderInAggregationStrategyTest extends Con
@Test
public void testCompletePreviousOnNewGroup() throws Exception {
getMockEndpoint("mock:aggregated").expectedBodiesReceived("AAA", "BB");
+ getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
+ getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
template.sendBody("direct:start", "A,A,A,B,B");
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index 11e1c3a9..576e806 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -46,9 +46,10 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
+ getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
- template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+ template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
assertMockEndpointsSatisfied();
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
index b69d5bd..dd56568 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
@@ -35,6 +35,14 @@ public class AggregationStrategyCompleteByPropertyTest extends ContextTestSuppor
result.message(0).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy");
result.message(1).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy");
+ // org.apache.camel.builder.ExpressionBuilder.headerExpression(java.lang.String) is going to property fallback
+ // the test (without the fix) will fail into error:
+ // java.lang.AssertionError: Assertion error at index 0 on mock mock://aggregated with predicate:
+ // header(CamelAggregationCompleteCurrentGroup) is null evaluated as: true is null on Exchange[ID-MacBook-Pro-1578822701664-0-2]
+ getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull();
+ // according to manual
+ getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull();
+
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
template.sendBodyAndHeader("direct:start", "C", "id", 123);
diff --git a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
index 8fd453f..db2be47 100644
--- a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
+++ b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
@@ -54,7 +54,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport {
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
- template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+ template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
assertMockEndpointsSatisfied();
}
diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
index 598ae86..4dfc1fb 100644
--- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
+++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
@@ -54,7 +54,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
- template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+ template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
assertMockEndpointsSatisfied();
}
diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
index 5dfb8e1..3df38bb 100644
--- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
+++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
@@ -45,7 +45,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
- template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+ template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
assertMockEndpointsSatisfied();
}