You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/23 11:54:13 UTC

[1/5] camel git commit: CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.

Repository: camel
Updated Branches:
  refs/heads/master 472903bf2 -> 3f9651578


CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/50945969
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/50945969
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/50945969

Branch: refs/heads/master
Commit: 5094596967e93417dc86e5f3fa38b08fe44c9797
Parents: 472903b
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 07:43:41 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:55:56 2015 +0100

----------------------------------------------------------------------
 .../processor/aggregate/AggregateProcessor.java | 57 ++++++++++----------
 .../PreCompletionAwareAggregationStrategy.java  | 32 +++++++++++
 2 files changed, 62 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/50945969/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index b71c0bf..9b93c36 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -422,44 +422,47 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
         List<Exchange> list = new ArrayList<Exchange>();
 
-        // only need to update aggregation repository if we are not complete
         if (complete == null) {
+            // only need to update aggregation repository if we are not complete
             doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
-            // we are not complete so the answer should be null
-            answer = null;
         } else {
-            // if batch consumer completion is enabled then we need to complete the group
-            if ("consumer".equals(complete)) {
-                for (String batchKey : batchConsumerCorrelationKeys) {
-                    Exchange batchAnswer;
-                    if (batchKey.equals(key)) {
-                        // skip the current aggregated key as we have already aggregated it and have the answer
-                        batchAnswer = answer;
-                    } else {
-                        batchAnswer = aggregationRepository.get(camelContext, batchKey);
-                    }
+            // if we are complete then add the answer to the list
+            doAggregationComplete(complete, list, key, originalExchange, answer);
+        }
 
-                    if (batchAnswer != null) {
-                        batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                        onCompletion(batchKey, originalExchange, batchAnswer, false);
-                        list.add(batchAnswer);
-                    }
+        LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
+        return list;
+    }
+
+    protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) {
+        if ("consumer".equals(complete)) {
+            for (String batchKey : batchConsumerCorrelationKeys) {
+                Exchange batchAnswer;
+                if (batchKey.equals(key)) {
+                    // skip the current aggregated key as we have already aggregated it and have the answer
+                    batchAnswer = answer;
+                } else {
+                    batchAnswer = aggregationRepository.get(camelContext, batchKey);
+                }
+
+                if (batchAnswer != null) {
+                    batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+                    onCompletion(batchKey, originalExchange, batchAnswer, false);
+                    list.add(batchAnswer);
                 }
-                batchConsumerCorrelationKeys.clear();
-                // we have already submitted to completion, so answer should be null
-                answer = null;
-            } else {
-                // we are complete for this exchange
-                answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                answer = onCompletion(key, originalExchange, answer, false);
             }
+            batchConsumerCorrelationKeys.clear();
+            // we have already submitted to completion, so answer should be null
+            answer = null;
+        } else {
+            // we are complete for this exchange
+            answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+            answer = onCompletion(key, originalExchange, answer, false);
         }
 
-        LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
         if (answer != null) {
             list.add(answer);
         }
-        return list;
     }
 
     protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/50945969/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
new file mode 100644
index 0000000..53fc3f0
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+public interface PreCompletionAwareAggregationStrategy extends AggregationStrategy {
+
+    /**
+     * Determines if the aggregation should complete the current group, and start a new group, or the aggregation
+     * should continue using the current group.
+     *
+     * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
+     * @param newExchange the newest exchange (can be <tt>null</tt> if there was no data possible to acquire)
+     * @return <tt>true</tt> to complete current group and start a new group, or <tt>false</tt> to keep using current
+     */
+    boolean preComplete(Exchange oldExchange, Exchange newExchange);
+}


[5/5] camel git commit: CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.

Posted by da...@apache.org.
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f965157
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f965157
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f965157

Branch: refs/heads/master
Commit: 3f9651578007dd42c02f80b92ba5d5d84097b094
Parents: efaa7bf
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 10:36:31 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:56:05 2015 +0100

----------------------------------------------------------------------
 .../aggregate/PreCompletionAwareAggregationStrategy.java    | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3f965157/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
index 53fc3f0..c2734bf 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
@@ -18,6 +18,15 @@ package org.apache.camel.processor.aggregate;
 
 import org.apache.camel.Exchange;
 
+/**
+ * A specialized {@link org.apache.camel.processor.aggregate.AggregationStrategy} which enables the aggregator to run
+ * in pre-completion mode. This allows the {@link #preComplete(org.apache.camel.Exchange, org.apache.camel.Exchange)} method
+ * to control the completion. Only completion timeout or interval can also be used; any other completion configuration
+ * is not in use.
+ * <p/>
+ * Using this strategy supports the use-case, where an incoming Exchange has information that may trigger the completion
+ * of the current group. And then use the new incoming Exchange to start a new group thereafter from scratch.
+ */
 public interface PreCompletionAwareAggregationStrategy extends AggregationStrategy {
 
     /**


[4/5] camel git commit: CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.

Posted by da...@apache.org.
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/efaa7bf7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/efaa7bf7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/efaa7bf7

Branch: refs/heads/master
Commit: efaa7bf71a674ac7a98d43b9c187860b04eef9ad
Parents: 7973ac5
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 10:25:09 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:56:03 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/model/AggregateDefinition.java |  1 +
 .../processor/aggregate/AggregateProcessor.java | 89 ++++++++++++++------
 .../AggregatePreCompleteAwareStrategyTest.java  | 54 ++++++++++++
 ...gatePreCompleteAwareStrategyTimeoutTest.java | 54 ++++++++++++
 .../AggregatePredicateAwareStrategyTest.java    | 53 ------------
 5 files changed, 172 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 942d69b..cfcb027 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -41,6 +41,7 @@ import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
 import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
+import org.apache.camel.processor.aggregate.PreCompletionAwareAggregationStrategy;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;

http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index b365442..fbec104 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -92,6 +92,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     private final Processor processor;
     private String id;
     private AggregationStrategy aggregationStrategy;
+    private boolean preCompletion;
     private Expression correlationExpression;
     private AggregateController aggregateController;
     private final ExecutorService executorService;
@@ -376,6 +377,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
 
         List<Exchange> list = new ArrayList<Exchange>();
+        String complete = null;
 
         Exchange answer;
         Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
@@ -396,31 +398,36 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         ExchangeHelper.prepareAggregation(oldExchange, newExchange);
 
         // check if we are pre complete
-        boolean preComplete;
-        try {
-            // put the current aggregated size on the exchange so its avail during completion check
-            newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
-            preComplete = onPreCompletionAggregation(oldExchange, newExchange);
-            // remove it afterwards
-            newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
-        } catch (Throwable e) {
-            // must catch any exception from aggregation
-            throw new CamelExchangeException("Error occurred during preComplete", newExchange, e);
-        }
-
-        // check if we are complete
-        String complete = null;
-        if (!preComplete && isEagerCheckCompletion()) {
+        if (preCompletion) {
+            try {
+                // put the current aggregated size on the exchange so its avail during completion check
+                newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+                complete = isPreCompleted(key, oldExchange, newExchange);
+                // make sure to track timeouts if not complete
+                if (complete == null) {
+                    trackTimeout(key, newExchange);
+                }
+                // remove it afterwards
+                newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
+            } catch (Throwable e) {
+                // must catch any exception from aggregation
+                throw new CamelExchangeException("Error occurred during preComplete", newExchange, e);
+            }
+        } else if (isEagerCheckCompletion()) {
             // put the current aggregated size on the exchange so its avail during completion check
             newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
             complete = isCompleted(key, newExchange);
+            // make sure to track timeouts if not complete
+            if (complete == null) {
+                trackTimeout(key, newExchange);
+            }
             // remove it afterwards
             newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
         }
 
-        if (preComplete) {
+        if (preCompletion && complete != null) {
             // need to pre complete the current group before we aggregate
-            doAggregationComplete("strategy", list, key, originalExchange, oldExchange);
+            doAggregationComplete(complete, list, key, originalExchange, oldExchange);
             // as we complete the current group eager, we should indicate the new group is not complete
             complete = null;
             // and clear old/original exchange as we start on a new group
@@ -445,8 +452,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         answer.setProperty(Exchange.AGGREGATED_SIZE, size);
 
         // maybe we should check completion after the aggregation
-        if (!isEagerCheckCompletion()) {
+        if (!preCompletion && !isEagerCheckCompletion()) {
             complete = isCompleted(key, answer);
+            // make sure to track timeouts if not complete
+            if (complete == null) {
+                trackTimeout(key, newExchange);
+            }
         }
 
         if (complete == null) {
@@ -515,6 +526,22 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     /**
+     * Tests whether the given exchanges is pre-complete or not
+     *
+     * @param key      the correlation key
+     * @param oldExchange   the existing exchange
+     * @param newExchange the incoming exchange
+     * @return <tt>null</tt> if not pre-completed, otherwise a String with the type that triggered the pre-completion
+     */
+    protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) {
+        boolean complete = false;
+        if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
+            complete = ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange);
+        }
+        return complete ? "strategy" : null;
+    }
+
+    /**
      * Tests whether the given exchange is complete or not
      *
      * @param key      the correlation key
@@ -564,6 +591,11 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             }
         }
 
+        // not complete
+        return null;
+    }
+
+    protected void trackTimeout(String key, Exchange exchange) {
         // timeout can be either evaluated based on an expression or from a fixed value
         // expression takes precedence
         boolean timeoutSet = false;
@@ -586,9 +618,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             }
             addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
         }
-
-        // not complete
-        return null;
     }
 
     protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
@@ -1182,11 +1211,19 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
     @Override
     protected void doStart() throws Exception {
-        if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
-                && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
-                && getCompletionSizeExpression() == null) {
-            throw new IllegalStateException("At least one of the completions options"
-                    + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
+        if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
+            preCompletion = true;
+            LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId());
+        }
+
+        if (!preCompletion) {
+            // if not in pre completion mode then check we configured the completion required
+            if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
+                    && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
+                    && getCompletionSizeExpression() == null) {
+                throw new IllegalStateException("At least one of the completions options"
+                        + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
+            }
         }
 
         if (getCloseCorrelationKeyOnCompletion() != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
new file mode 100644
index 0000000..f965c90
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
+
+/**
+ * @version 
+ */
+public class AggregatePreCompleteAwareStrategyTest extends ContextTestSupport {
+
+    public void testAggregatePreComplete() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy())
+                        .to("mock:aggregated");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
new file mode 100644
index 0000000..abfda10
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
+
+/**
+ * @version 
+ */
+public class AggregatePreCompleteAwareStrategyTimeoutTest extends ContextTestSupport {
+
+    public void testAggregatePreCompleteTimeout() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E", "X+F");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionTimeout(1000)
+                        .to("mock:aggregated");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
deleted file mode 100644
index 74fe19b..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
-
-/**
- * @version 
- */
-public class AggregatePredicateAwareStrategyTest extends ContextTestSupport {
-
-    public void testAggregatePreComplete() throws Exception {
-        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E");
-
-        template.sendBodyAndHeader("direct:start", "A", "id", 123);
-        template.sendBodyAndHeader("direct:start", "B", "id", 123);
-        template.sendBodyAndHeader("direct:start", "C", "id", 123);
-        template.sendBodyAndHeader("direct:start", "X", "id", 123);
-        template.sendBodyAndHeader("direct:start", "D", "id", 123);
-        template.sendBodyAndHeader("direct:start", "E", "id", 123);
-        template.sendBodyAndHeader("direct:start", "X", "id", 123);
-
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:start")
-                    .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionSize(5)
-                        .to("mock:aggregated");
-            }
-        };
-    }
-}
\ No newline at end of file


[2/5] camel git commit: CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.

Posted by da...@apache.org.
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0c7b6d22
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0c7b6d22
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0c7b6d22

Branch: refs/heads/master
Commit: 0c7b6d22be17f82375ce94aaac4477f45260a171
Parents: 5094596
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 08:08:42 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:55:58 2015 +0100

----------------------------------------------------------------------
 .../processor/aggregate/AggregateProcessor.java | 46 ++++++++++++++---
 .../BodyInPreCompleteAggregatingStrategy.java   | 40 +++++++++++++++
 .../AggregatePredicateAwareStrategyTest.java    | 53 ++++++++++++++++++++
 3 files changed, 133 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0c7b6d22/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 9b93c36..b365442 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -375,6 +375,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException {
         LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
 
+        List<Exchange> list = new ArrayList<Exchange>();
+
         Exchange answer;
         Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
         Exchange oldExchange = originalExchange;
@@ -390,9 +392,25 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             size++;
         }
 
+        // prepare the exchanges for aggregation
+        ExchangeHelper.prepareAggregation(oldExchange, newExchange);
+
+        // check if we are pre complete
+        boolean preComplete;
+        try {
+            // put the current aggregated size on the exchange so its avail during completion check
+            newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+            preComplete = onPreCompletionAggregation(oldExchange, newExchange);
+            // remove it afterwards
+            newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
+        } catch (Throwable e) {
+            // must catch any exception from aggregation
+            throw new CamelExchangeException("Error occurred during preComplete", newExchange, e);
+        }
+
         // check if we are complete
         String complete = null;
-        if (isEagerCheckCompletion()) {
+        if (!preComplete && isEagerCheckCompletion()) {
             // put the current aggregated size on the exchange so its avail during completion check
             newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
             complete = isCompleted(key, newExchange);
@@ -400,12 +418,23 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
         }
 
-        // prepare the exchanges for aggregation and then aggregate them
-        ExchangeHelper.prepareAggregation(oldExchange, newExchange);
-        // must catch any exception from aggregation
+        if (preComplete) {
+            // need to pre complete the current group before we aggregate
+            doAggregationComplete("strategy", list, key, originalExchange, oldExchange);
+            // as we complete the current group eager, we should indicate the new group is not complete
+            complete = null;
+            // and clear old/original exchange as we start on a new group
+            oldExchange = null;
+            originalExchange = null;
+            // and reset the size to 1
+            size = 1;
+        }
+
+        // aggregate the exchanges
         try {
             answer = onAggregation(oldExchange, newExchange);
         } catch (Throwable e) {
+            // must catch any exception from aggregation
             throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
         }
         if (answer == null) {
@@ -420,8 +449,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             complete = isCompleted(key, answer);
         }
 
-        List<Exchange> list = new ArrayList<Exchange>();
-
         if (complete == null) {
             // only need to update aggregation repository if we are not complete
             doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
@@ -568,6 +595,13 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         return aggregationStrategy.aggregate(oldExchange, newExchange);
     }
 
+    protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) {
+        if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
+            return ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange);
+        }
+        return false;
+    }
+
     protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
         // store the correlation key as property before we remove so the repository has that information
         if (original != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/0c7b6d22/camel-core/src/test/java/org/apache/camel/processor/BodyInPreCompleteAggregatingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/BodyInPreCompleteAggregatingStrategy.java b/camel-core/src/test/java/org/apache/camel/processor/BodyInPreCompleteAggregatingStrategy.java
new file mode 100644
index 0000000..11a87c8
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/BodyInPreCompleteAggregatingStrategy.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.PreCompletionAwareAggregationStrategy;
+
+public class BodyInPreCompleteAggregatingStrategy implements PreCompletionAwareAggregationStrategy {
+
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (oldExchange == null) {
+            return newExchange;
+        }
+
+        String oldBody = oldExchange.getIn().getBody(String.class);
+        String newBody = newExchange.getIn().getBody(String.class);
+        oldExchange.getIn().setBody(oldBody + "+" + newBody);
+        return oldExchange;
+    }
+
+    public boolean preComplete(Exchange oldExchange, Exchange newExchange) {
+        // pre complete when new body has an X
+        String newBody = newExchange.getIn().getBody(String.class);
+        return newBody.contains("X");
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c7b6d22/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
new file mode 100644
index 0000000..74fe19b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
+
+/**
+ * @version 
+ */
+public class AggregatePredicateAwareStrategyTest extends ContextTestSupport {
+
+    public void testAggregatePreComplete() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionSize(5)
+                        .to("mock:aggregated");
+            }
+        };
+    }
+}
\ No newline at end of file


[3/5] camel git commit: CAMEL-8527: Processor in routes should be IdAware

Posted by da...@apache.org.
CAMEL-8527: Processor in routes should be IdAware


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7973ac5f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7973ac5f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7973ac5f

Branch: refs/heads/master
Commit: 7973ac5f1baba82c4a8b5340d441bf6881a772c6
Parents: 0c7b6d2
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 23 08:23:09 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 23 11:56:00 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/CamelContext.java     | 20 ++++++++++++++++++++
 .../apache/camel/impl/DefaultCamelContext.java  | 16 ++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7973ac5f/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index b92349e..0b7d8b6 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -522,6 +522,16 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
     Processor getProcessor(String id);
 
     /**
+     * Gets the processor from any of the routes which with the given id
+     *
+     * @param id id of the processor
+     * @param type the processor type
+     * @return the processor or <tt>null</tt> if not found
+     * @throws java.lang.ClassCastException is thrown if the type is not correct type
+     */
+    <T extends Processor> T getProcessor(String id, Class<T> type);
+
+    /**
      * Gets the processor definition from any of the routes which with the given id
      *
      * @param id id of the processor definition
@@ -530,6 +540,16 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
     ProcessorDefinition getProcessorDefinition(String id);
 
     /**
+     * Gets the processor definition from any of the routes which with the given id
+     *
+     * @param id id of the processor definition
+     * @param type the processor definition type
+     * @return the processor definition or <tt>null</tt> if not found
+     * @throws java.lang.ClassCastException is thrown if the type is not correct type
+     */
+    <T extends ProcessorDefinition> T getProcessorDefinition(String id, Class<T> type);
+
+    /**
      * Adds a collection of routes to this context using the given builder
      * to build them.
      * <p/>

http://git-wip-us.apache.org/repos/asf/camel/blob/7973ac5f/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 9ac8692..b0874a5 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -719,6 +719,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         return null;
     }
 
+    public <T extends Processor> T getProcessor(String id, Class<T> type) {
+        Processor answer = getProcessor(id);
+        if (answer != null) {
+            return type.cast(answer);
+        }
+        return null;
+    }
+
     public ProcessorDefinition getProcessorDefinition(String id) {
         for (RouteDefinition route : getRouteDefinitions()) {
             Iterator<ProcessorDefinition> it = ProcessorDefinitionHelper.filterTypeInOutputs(route.getOutputs(), ProcessorDefinition.class);
@@ -732,6 +740,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         return null;
     }
 
+    public <T extends ProcessorDefinition> T getProcessorDefinition(String id, Class<T> type) {
+        ProcessorDefinition answer = getProcessorDefinition(id);
+        if (answer != null) {
+            return type.cast(answer);
+        }
+        return null;
+    }
+
     @Deprecated
     public void setRoutes(List<Route> routes) {
         throw new UnsupportedOperationException("Overriding existing routes is not supported yet, use addRouteCollection instead");