You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/23 11:54:13 UTC
[1/5] camel git commit: CAMEL-7433: Allow aggregation strategy to
determine pre complete when using aggregator.
Repository: camel
Updated Branches:
refs/heads/master 472903bf2 -> 3f9651578
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/50945969
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/50945969
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/50945969
Branch: refs/heads/master
Commit: 5094596967e93417dc86e5f3fa38b08fe44c9797
Parents: 472903b
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 07:43:41 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:55:56 2015 +0100
----------------------------------------------------------------------
.../processor/aggregate/AggregateProcessor.java | 57 ++++++++++----------
.../PreCompletionAwareAggregationStrategy.java | 32 +++++++++++
2 files changed, 62 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/50945969/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index b71c0bf..9b93c36 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -422,44 +422,47 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
List<Exchange> list = new ArrayList<Exchange>();
- // only need to update aggregation repository if we are not complete
if (complete == null) {
+ // only need to update aggregation repository if we are not complete
doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
- // we are not complete so the answer should be null
- answer = null;
} else {
- // if batch consumer completion is enabled then we need to complete the group
- if ("consumer".equals(complete)) {
- for (String batchKey : batchConsumerCorrelationKeys) {
- Exchange batchAnswer;
- if (batchKey.equals(key)) {
- // skip the current aggregated key as we have already aggregated it and have the answer
- batchAnswer = answer;
- } else {
- batchAnswer = aggregationRepository.get(camelContext, batchKey);
- }
+ // if we are complete then add the answer to the list
+ doAggregationComplete(complete, list, key, originalExchange, answer);
+ }
- if (batchAnswer != null) {
- batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
- onCompletion(batchKey, originalExchange, batchAnswer, false);
- list.add(batchAnswer);
- }
+ LOG.trace("onAggregation +++ end +++ with correlation key: {}", key);
+ return list;
+ }
+
+ protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) {
+ if ("consumer".equals(complete)) {
+ for (String batchKey : batchConsumerCorrelationKeys) {
+ Exchange batchAnswer;
+ if (batchKey.equals(key)) {
+ // skip the current aggregated key as we have already aggregated it and have the answer
+ batchAnswer = answer;
+ } else {
+ batchAnswer = aggregationRepository.get(camelContext, batchKey);
+ }
+
+ if (batchAnswer != null) {
+ batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+ onCompletion(batchKey, originalExchange, batchAnswer, false);
+ list.add(batchAnswer);
}
- batchConsumerCorrelationKeys.clear();
- // we have already submitted to completion, so answer should be null
- answer = null;
- } else {
- // we are complete for this exchange
- answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
- answer = onCompletion(key, originalExchange, answer, false);
}
+ batchConsumerCorrelationKeys.clear();
+ // we have already submitted to completion, so answer should be null
+ answer = null;
+ } else {
+ // we are complete for this exchange
+ answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+ answer = onCompletion(key, originalExchange, answer, false);
}
- LOG.trace("onAggregation +++ end +++ with correlation key: {}", key);
if (answer != null) {
list.add(answer);
}
- return list;
}
protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {
http://git-wip-us.apache.org/repos/asf/camel/blob/50945969/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
new file mode 100644
index 0000000..53fc3f0
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+public interface PreCompletionAwareAggregationStrategy extends AggregationStrategy {
+
+ /**
+ * Determines if the aggregation should complete the current group, and start a new group, or the aggregation
+ * should continue using the current group.
+ *
+ * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
+ * @param newExchange the newest exchange (can be <tt>null</tt> if there was no data possible to acquire)
+ * @return <tt>true</tt> to complete current group and start a new group, or <tt>false</tt> to keep using current
+ */
+ boolean preComplete(Exchange oldExchange, Exchange newExchange);
+}
[5/5] camel git commit: CAMEL-7433: Allow aggregation strategy to
determine pre complete when using aggregator.
Posted by da...@apache.org.
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f965157
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f965157
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f965157
Branch: refs/heads/master
Commit: 3f9651578007dd42c02f80b92ba5d5d84097b094
Parents: efaa7bf
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 10:36:31 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:56:05 2015 +0100
----------------------------------------------------------------------
.../aggregate/PreCompletionAwareAggregationStrategy.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3f965157/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
index 53fc3f0..c2734bf 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
@@ -18,6 +18,15 @@ package org.apache.camel.processor.aggregate;
import org.apache.camel.Exchange;
+/**
+ * A specialized {@link org.apache.camel.processor.aggregate.AggregationStrategy} which enables the aggregator to run
+ * in pre-completion mode. This allows the {@link #preComplete(org.apache.camel.Exchange, org.apache.camel.Exchange)} method
+ * to control the completion. Only completion timeout or interval can also be used; any other completion configuration
+ * is not in use.
+ * <p/>
+ * Using this strategy supports the use-case, where an incoming Exchange has information that may trigger the completion
+ * of the current group. And then use the new incoming Exchange to start a new group thereafter from scratch.
+ */
public interface PreCompletionAwareAggregationStrategy extends AggregationStrategy {
/**
[4/5] camel git commit: CAMEL-7433: Allow aggregation strategy to
determine pre complete when using aggregator.
Posted by da...@apache.org.
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/efaa7bf7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/efaa7bf7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/efaa7bf7
Branch: refs/heads/master
Commit: efaa7bf71a674ac7a98d43b9c187860b04eef9ad
Parents: 7973ac5
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 10:25:09 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:56:03 2015 +0100
----------------------------------------------------------------------
.../apache/camel/model/AggregateDefinition.java | 1 +
.../processor/aggregate/AggregateProcessor.java | 89 ++++++++++++++------
.../AggregatePreCompleteAwareStrategyTest.java | 54 ++++++++++++
...gatePreCompleteAwareStrategyTimeoutTest.java | 54 ++++++++++++
.../AggregatePredicateAwareStrategyTest.java | 53 ------------
5 files changed, 172 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 942d69b..cfcb027 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -41,6 +41,7 @@ import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
+import org.apache.camel.processor.aggregate.PreCompletionAwareAggregationStrategy;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index b365442..fbec104 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -92,6 +92,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
private final Processor processor;
private String id;
private AggregationStrategy aggregationStrategy;
+ private boolean preCompletion;
private Expression correlationExpression;
private AggregateController aggregateController;
private final ExecutorService executorService;
@@ -376,6 +377,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
List<Exchange> list = new ArrayList<Exchange>();
+ String complete = null;
Exchange answer;
Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
@@ -396,31 +398,36 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
ExchangeHelper.prepareAggregation(oldExchange, newExchange);
// check if we are pre complete
- boolean preComplete;
- try {
- // put the current aggregated size on the exchange so its avail during completion check
- newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
- preComplete = onPreCompletionAggregation(oldExchange, newExchange);
- // remove it afterwards
- newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
- } catch (Throwable e) {
- // must catch any exception from aggregation
- throw new CamelExchangeException("Error occurred during preComplete", newExchange, e);
- }
-
- // check if we are complete
- String complete = null;
- if (!preComplete && isEagerCheckCompletion()) {
+ if (preCompletion) {
+ try {
+ // put the current aggregated size on the exchange so its avail during completion check
+ newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+ complete = isPreCompleted(key, oldExchange, newExchange);
+ // make sure to track timeouts if not complete
+ if (complete == null) {
+ trackTimeout(key, newExchange);
+ }
+ // remove it afterwards
+ newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
+ } catch (Throwable e) {
+ // must catch any exception from aggregation
+ throw new CamelExchangeException("Error occurred during preComplete", newExchange, e);
+ }
+ } else if (isEagerCheckCompletion()) {
// put the current aggregated size on the exchange so its avail during completion check
newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
complete = isCompleted(key, newExchange);
+ // make sure to track timeouts if not complete
+ if (complete == null) {
+ trackTimeout(key, newExchange);
+ }
// remove it afterwards
newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
}
- if (preComplete) {
+ if (preCompletion && complete != null) {
// need to pre complete the current group before we aggregate
- doAggregationComplete("strategy", list, key, originalExchange, oldExchange);
+ doAggregationComplete(complete, list, key, originalExchange, oldExchange);
// as we complete the current group eager, we should indicate the new group is not complete
complete = null;
// and clear old/original exchange as we start on a new group
@@ -445,8 +452,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
answer.setProperty(Exchange.AGGREGATED_SIZE, size);
// maybe we should check completion after the aggregation
- if (!isEagerCheckCompletion()) {
+ if (!preCompletion && !isEagerCheckCompletion()) {
complete = isCompleted(key, answer);
+ // make sure to track timeouts if not complete
+ if (complete == null) {
+ trackTimeout(key, newExchange);
+ }
}
if (complete == null) {
@@ -515,6 +526,22 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
/**
+ * Tests whether the given exchanges is pre-complete or not
+ *
+ * @param key the correlation key
+ * @param oldExchange the existing exchange
+ * @param newExchange the incoming exchange
+ * @return <tt>null</tt> if not pre-completed, otherwise a String with the type that triggered the pre-completion
+ */
+ protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) {
+ boolean complete = false;
+ if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
+ complete = ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange);
+ }
+ return complete ? "strategy" : null;
+ }
+
+ /**
* Tests whether the given exchange is complete or not
*
* @param key the correlation key
@@ -564,6 +591,11 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
}
+ // not complete
+ return null;
+ }
+
+ protected void trackTimeout(String key, Exchange exchange) {
// timeout can be either evaluated based on an expression or from a fixed value
// expression takes precedence
boolean timeoutSet = false;
@@ -586,9 +618,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
}
-
- // not complete
- return null;
}
protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
@@ -1182,11 +1211,19 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
@Override
protected void doStart() throws Exception {
- if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
- && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
- && getCompletionSizeExpression() == null) {
- throw new IllegalStateException("At least one of the completions options"
- + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
+ if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
+ preCompletion = true;
+ LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId());
+ }
+
+ if (!preCompletion) {
+ // if not in pre completion mode then check we configured the completion required
+ if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
+ && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
+ && getCompletionSizeExpression() == null) {
+ throw new IllegalStateException("At least one of the completions options"
+ + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
+ }
}
if (getCloseCorrelationKeyOnCompletion() != null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
new file mode 100644
index 0000000..f965c90
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
+
+/**
+ * @version
+ */
+public class AggregatePreCompleteAwareStrategyTest extends ContextTestSupport {
+
+ public void testAggregatePreComplete() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+ template.sendBodyAndHeader("direct:start", "F", "id", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy())
+ .to("mock:aggregated");
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
new file mode 100644
index 0000000..abfda10
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
+
+/**
+ * @version
+ */
+public class AggregatePreCompleteAwareStrategyTimeoutTest extends ContextTestSupport {
+
+ public void testAggregatePreCompleteTimeout() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E", "X+F");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+ template.sendBodyAndHeader("direct:start", "F", "id", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionTimeout(1000)
+ .to("mock:aggregated");
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
deleted file mode 100644
index 74fe19b..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
-
-/**
- * @version
- */
-public class AggregatePredicateAwareStrategyTest extends ContextTestSupport {
-
- public void testAggregatePreComplete() throws Exception {
- getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E");
-
- template.sendBodyAndHeader("direct:start", "A", "id", 123);
- template.sendBodyAndHeader("direct:start", "B", "id", 123);
- template.sendBodyAndHeader("direct:start", "C", "id", 123);
- template.sendBodyAndHeader("direct:start", "X", "id", 123);
- template.sendBodyAndHeader("direct:start", "D", "id", 123);
- template.sendBodyAndHeader("direct:start", "E", "id", 123);
- template.sendBodyAndHeader("direct:start", "X", "id", 123);
-
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:start")
- .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionSize(5)
- .to("mock:aggregated");
- }
- };
- }
-}
\ No newline at end of file
[2/5] camel git commit: CAMEL-7433: Allow aggregation strategy to
determine pre complete when using aggregator.
Posted by da...@apache.org.
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0c7b6d22
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0c7b6d22
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0c7b6d22
Branch: refs/heads/master
Commit: 0c7b6d22be17f82375ce94aaac4477f45260a171
Parents: 5094596
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 08:08:42 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:55:58 2015 +0100
----------------------------------------------------------------------
.../processor/aggregate/AggregateProcessor.java | 46 ++++++++++++++---
.../BodyInPreCompleteAggregatingStrategy.java | 40 +++++++++++++++
.../AggregatePredicateAwareStrategyTest.java | 53 ++++++++++++++++++++
3 files changed, 133 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0c7b6d22/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 9b93c36..b365442 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -375,6 +375,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException {
LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
+ List<Exchange> list = new ArrayList<Exchange>();
+
Exchange answer;
Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
Exchange oldExchange = originalExchange;
@@ -390,9 +392,25 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
size++;
}
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(oldExchange, newExchange);
+
+ // check if we are pre complete
+ boolean preComplete;
+ try {
+ // put the current aggregated size on the exchange so its avail during completion check
+ newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+ preComplete = onPreCompletionAggregation(oldExchange, newExchange);
+ // remove it afterwards
+ newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
+ } catch (Throwable e) {
+ // must catch any exception from aggregation
+ throw new CamelExchangeException("Error occurred during preComplete", newExchange, e);
+ }
+
// check if we are complete
String complete = null;
- if (isEagerCheckCompletion()) {
+ if (!preComplete && isEagerCheckCompletion()) {
// put the current aggregated size on the exchange so its avail during completion check
newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
complete = isCompleted(key, newExchange);
@@ -400,12 +418,23 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
}
- // prepare the exchanges for aggregation and then aggregate them
- ExchangeHelper.prepareAggregation(oldExchange, newExchange);
- // must catch any exception from aggregation
+ if (preComplete) {
+ // need to pre complete the current group before we aggregate
+ doAggregationComplete("strategy", list, key, originalExchange, oldExchange);
+ // as we complete the current group eager, we should indicate the new group is not complete
+ complete = null;
+ // and clear old/original exchange as we start on a new group
+ oldExchange = null;
+ originalExchange = null;
+ // and reset the size to 1
+ size = 1;
+ }
+
+ // aggregate the exchanges
try {
answer = onAggregation(oldExchange, newExchange);
} catch (Throwable e) {
+ // must catch any exception from aggregation
throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
}
if (answer == null) {
@@ -420,8 +449,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
complete = isCompleted(key, answer);
}
- List<Exchange> list = new ArrayList<Exchange>();
-
if (complete == null) {
// only need to update aggregation repository if we are not complete
doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
@@ -568,6 +595,13 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
return aggregationStrategy.aggregate(oldExchange, newExchange);
}
+ protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) {
+ if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
+ return ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange);
+ }
+ return false;
+ }
+
protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
// store the correlation key as property before we remove so the repository has that information
if (original != null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/0c7b6d22/camel-core/src/test/java/org/apache/camel/processor/BodyInPreCompleteAggregatingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/BodyInPreCompleteAggregatingStrategy.java b/camel-core/src/test/java/org/apache/camel/processor/BodyInPreCompleteAggregatingStrategy.java
new file mode 100644
index 0000000..11a87c8
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/BodyInPreCompleteAggregatingStrategy.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.PreCompletionAwareAggregationStrategy;
+
+public class BodyInPreCompleteAggregatingStrategy implements PreCompletionAwareAggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String oldBody = oldExchange.getIn().getBody(String.class);
+ String newBody = newExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(oldBody + "+" + newBody);
+ return oldExchange;
+ }
+
+ public boolean preComplete(Exchange oldExchange, Exchange newExchange) {
+ // pre complete when new body has an X
+ String newBody = newExchange.getIn().getBody(String.class);
+ return newBody.contains("X");
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0c7b6d22/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
new file mode 100644
index 0000000..74fe19b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
+
+/**
+ * @version
+ */
+public class AggregatePredicateAwareStrategyTest extends ContextTestSupport {
+
+ public void testAggregatePreComplete() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+ template.sendBodyAndHeader("direct:start", "X", "id", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionSize(5)
+ .to("mock:aggregated");
+ }
+ };
+ }
+}
\ No newline at end of file
[3/5] camel git commit: CAMEL-8527: Processor in routes should be
IdAware
Posted by da...@apache.org.
CAMEL-8527: Processor in routes should be IdAware
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7973ac5f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7973ac5f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7973ac5f
Branch: refs/heads/master
Commit: 7973ac5f1baba82c4a8b5340d441bf6881a772c6
Parents: 0c7b6d2
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 08:23:09 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:56:00 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/CamelContext.java | 20 ++++++++++++++++++++
.../apache/camel/impl/DefaultCamelContext.java | 16 ++++++++++++++++
2 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7973ac5f/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index b92349e..0b7d8b6 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -522,6 +522,16 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
Processor getProcessor(String id);
/**
+ * Gets the processor from any of the routes which with the given id
+ *
+ * @param id id of the processor
+ * @param type the processor type
+ * @return the processor or <tt>null</tt> if not found
+ * @throws java.lang.ClassCastException is thrown if the type is not correct type
+ */
+ <T extends Processor> T getProcessor(String id, Class<T> type);
+
+ /**
* Gets the processor definition from any of the routes which with the given id
*
* @param id id of the processor definition
@@ -530,6 +540,16 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
ProcessorDefinition getProcessorDefinition(String id);
/**
+ * Gets the processor definition from any of the routes which with the given id
+ *
+ * @param id id of the processor definition
+ * @param type the processor definition type
+ * @return the processor definition or <tt>null</tt> if not found
+ * @throws java.lang.ClassCastException is thrown if the type is not correct type
+ */
+ <T extends ProcessorDefinition> T getProcessorDefinition(String id, Class<T> type);
+
+ /**
* Adds a collection of routes to this context using the given builder
* to build them.
* <p/>
http://git-wip-us.apache.org/repos/asf/camel/blob/7973ac5f/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 9ac8692..b0874a5 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -719,6 +719,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
return null;
}
+ public <T extends Processor> T getProcessor(String id, Class<T> type) {
+ Processor answer = getProcessor(id);
+ if (answer != null) {
+ return type.cast(answer);
+ }
+ return null;
+ }
+
public ProcessorDefinition getProcessorDefinition(String id) {
for (RouteDefinition route : getRouteDefinitions()) {
Iterator<ProcessorDefinition> it = ProcessorDefinitionHelper.filterTypeInOutputs(route.getOutputs(), ProcessorDefinition.class);
@@ -732,6 +740,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
return null;
}
+ public <T extends ProcessorDefinition> T getProcessorDefinition(String id, Class<T> type) {
+ ProcessorDefinition answer = getProcessorDefinition(id);
+ if (answer != null) {
+ return type.cast(answer);
+ }
+ return null;
+ }
+
@Deprecated
public void setRoutes(List<Route> routes) {
throw new UnsupportedOperationException("Overriding existing routes is not supported yet, use addRouteCollection instead");