You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/18 12:27:40 UTC

svn commit: r911334 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/aggregator/ components/camel-spring/src/test/java...

Author: davsclaus
Date: Thu Feb 18 11:27:39 2010
New Revision: 911334

URL: http://svn.apache.org/viewvc?rev=911334&view=rev
Log:
CAMEL-2140: Aggregator supporting dynamic completion size and timeout.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeFallbackTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeTest.java
      - copied, changed from r911285, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutFallbackTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java
      - copied, changed from r911285, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.java
      - copied, changed from r911293, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml   (with props)
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml   (with props)
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml   (with props)
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml   (with props)
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.xml
      - copied, changed from r911293, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=911334&r1=911333&r2=911334&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Thu Feb 18 11:27:39 2010
@@ -51,6 +51,10 @@
     private ExpressionSubElementDefinition correlationExpression;
     @XmlElement(name = "completionPredicate", required = false)
     private ExpressionSubElementDefinition completionPredicate;
+    @XmlElement(name = "completionTimeout", required = false)
+    private ExpressionSubElementDefinition completionTimeoutExpression;
+    @XmlElement(name = "completionSize", required = false)
+    private ExpressionSubElementDefinition completionSizeExpression;
     @XmlTransient
     private ExpressionDefinition expression;
     @XmlElementRef
@@ -160,12 +164,20 @@
             Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
             answer.setCompletionPredicate(predicate);
         }
-        if (getCompletionSize() != null) {
-            answer.setCompletionSize(getCompletionSize());
+        if (getCompletionTimeoutExpression() != null) {
+            Expression expression = getCompletionTimeoutExpression().createExpression(routeContext);
+            answer.setCompletionTimeoutExpression(expression);
         }
         if (getCompletionTimeout() != null) {
             answer.setCompletionTimeout(getCompletionTimeout());
         }
+        if (getCompletionSizeExpression() != null) {
+            Expression expression = getCompletionSizeExpression().createExpression(routeContext);
+            answer.setCompletionSizeExpression(expression);
+        }
+        if (getCompletionSize() != null) {
+            answer.setCompletionSize(getCompletionSize());
+        }
         if (isCompletionFromBatchConsumer() != null) {
             answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
         }
@@ -252,6 +264,22 @@
         this.completionPredicate = completionPredicate;
     }
 
+    public ExpressionSubElementDefinition getCompletionTimeoutExpression() {
+        return completionTimeoutExpression;
+    }
+
+    public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) {
+        this.completionTimeoutExpression = completionTimeoutExpression;
+    }
+
+    public ExpressionSubElementDefinition getCompletionSizeExpression() {
+        return completionSizeExpression;
+    }
+
+    public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) {
+        this.completionSizeExpression = completionSizeExpression;
+    }
+
     public Boolean isGroupExchanges() {
         return groupExchanges;
     }
@@ -404,6 +432,18 @@
     }
 
     /**
+     * Sets the completion size, which is the number of aggregated exchanges which would
+     * cause the aggregate to consider the group as complete and send out the aggregated exchange.
+     *
+     * @param completionSize  the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type
+     * @return builder
+     */
+    public AggregateDefinition completionSize(Expression completionSize) {
+        setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize));
+        return this;
+    }
+
+    /**
      * Sets the completion timeout, which would cause the aggregate to consider the group as complete
      * and send out the aggregated exchange.
      *
@@ -416,6 +456,18 @@
     }
 
     /**
+     * Sets the completion timeout, which would cause the aggregate to consider the group as complete
+     * and send out the aggregated exchange.
+     *
+     * @param completionTimeout  the timeout as an {@link Expression} which is evaluated as a {@link Long} type
+     * @return the builder
+     */
+    public AggregateDefinition completionTimeout(Expression completionTimeout) {
+        setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout));
+        return this;
+    }
+
+    /**
      * Sets the aggregate strategy to use
      *
      * @param aggregationStrategy  the aggregate strategy to use

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=911334&r1=911333&r2=911334&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Feb 18 11:27:39 2010
@@ -85,7 +85,9 @@
     private boolean eagerCheckCompletion;
     private Predicate completionPredicate;
     private long completionTimeout;
+    private Expression completionTimeoutExpression;
     private int completionSize;
+    private Expression completionSizeExpression;
     private boolean completionFromBatchConsumer;
     private AtomicInteger batchConsumerCounter = new AtomicInteger();
 
@@ -221,6 +223,15 @@
             }
         }
 
+        if (getCompletionSizeExpression() != null) {
+            Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class);
+            if (value != null && value > 0) {
+                int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
+                if (size >= value) {
+                    return true;
+                }
+            }
+        }
         if (getCompletionSize() > 0) {
             int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
             if (size >= getCompletionSize()) {
@@ -228,7 +239,21 @@
             }
         }
 
-        if (getCompletionTimeout() > 0) {
+        // timeout can be either evaluated based on an expression or from a fixed value
+        // expression takes precedence
+        boolean timeoutSet = false;
+        if (getCompletionTimeoutExpression() != null) {
+            Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class);
+            if (value != null && value > 0) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Updating correlation key " + key + " to timeout after "
+                            + value + " ms. as exchange received: " + exchange);
+                }
+                timeoutMap.put(key, exchange, value);
+                timeoutSet = true;
+            }
+        }
+        if (!timeoutSet && getCompletionTimeout() > 0) {
             // timeout is used so use the timeout map to keep an eye on this
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Updating correlation key " + key + " to timeout after "
@@ -323,6 +348,14 @@
         this.completionTimeout = completionTimeout;
     }
 
+    public Expression getCompletionTimeoutExpression() {
+        return completionTimeoutExpression;
+    }
+
+    public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) {
+        this.completionTimeoutExpression = completionTimeoutExpression;
+    }
+
     public int getCompletionSize() {
         return completionSize;
     }
@@ -331,6 +364,14 @@
         this.completionSize = completionSize;
     }
 
+    public Expression getCompletionSizeExpression() {
+        return completionSizeExpression;
+    }
+
+    public void setCompletionSizeExpression(Expression completionSizeExpression) {
+        this.completionSizeExpression = completionSizeExpression;
+    }
+
     public boolean isIgnoreBadCorrelationKeys() {
         return ignoreBadCorrelationKeys;
     }
@@ -402,9 +443,11 @@
 
     @Override
     protected void doStart() throws Exception {
-        if (getCompletionTimeout() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null) {
+        if (getCompletionTimeout() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
+                && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
+                && getCompletionSizeExpression() == null) {
             throw new IllegalStateException("At least one of the completions options"
-                    + " [completionTimeout, completionAggregatedSize, completionPredicate] must be set");
+                    + " [completionTimeout, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
         }
 
         if (getCloseCorrelationKeyOnCompletion() != null) {
@@ -431,7 +474,7 @@
         }
 
         // start timeout service if its in use
-        if (getCompletionTimeout() > 0) {
+        if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
             ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", true);
             timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
             ServiceHelper.startService(timeoutMap);

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeFallbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeFallbackTest.java?rev=911334&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeFallbackTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeFallbackTest.java Thu Feb 18 11:27:39 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateExpressionSizeFallbackTest extends ContextTestSupport {
+
+    public void testAggregateExpressionSizeFallback() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
+
+        Map headers = new HashMap();
+        headers.put("id", 123);
+
+        template.sendBodyAndHeaders("direct:start", "A", headers);
+        template.sendBodyAndHeaders("direct:start", "B", headers);
+        template.sendBodyAndHeaders("direct:start", "C", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        // if no mySize header it will fallback to the 3 in size
+                        .completionSize(header("mySize")).completionSize(3)
+                        .to("mock:aggregated");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeFallbackTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeFallbackTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeTest.java (from r911285, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java&r1=911285&r2=911334&rev=911334&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeTest.java Thu Feb 18 11:27:39 2010
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
@@ -23,14 +26,18 @@
 /**
  * @version $Revision$
  */
-public class AggregateSimpleSizeTest extends ContextTestSupport {
+public class AggregateExpressionSizeTest extends ContextTestSupport {
 
-    public void testAggregateSimpleSize() throws Exception {
+    public void testAggregateExpressionSize() throws Exception {
         getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
 
-        template.sendBodyAndHeader("direct:start", "A", "id", 123);
-        template.sendBodyAndHeader("direct:start", "B", "id", 123);
-        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        Map headers = new HashMap();
+        headers.put("id", 123);
+        headers.put("mySize", 3);
+
+        template.sendBodyAndHeaders("direct:start", "A", headers);
+        template.sendBodyAndHeaders("direct:start", "B", headers);
+        template.sendBodyAndHeaders("direct:start", "C", headers);
 
         assertMockEndpointsSatisfied();
     }
@@ -44,9 +51,9 @@
                 from("direct:start")
                     // aggregate all exchanges correlated by the id header.
                     // Aggregate them using the BodyInAggregatingStrategy strategy which
-                    // and after 3 messages has been aggregated then complete the aggregation
+                    // and the header mySize determines the number of aggregated messages should trigger the completion
                     // and send it to mock:aggregated
-                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3)
+                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(header("mySize"))
                         .to("mock:aggregated");
                 // END SNIPPET: e1
             }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutFallbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutFallbackTest.java?rev=911334&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutFallbackTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutFallbackTest.java Thu Feb 18 11:27:39 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateExpressionTimeoutFallbackTest extends ContextTestSupport {
+
+    public void testAggregateExpressionTimeoutFallback() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
+
+        Map headers = new HashMap();
+        headers.put("id", 123);
+
+        template.sendBodyAndHeaders("direct:start", "A", headers);
+        template.sendBodyAndHeaders("direct:start", "B", headers);
+        template.sendBodyAndHeaders("direct:start", "C", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        // if no timeout header it will fallback to the 2000 seconds
+                        .completionTimeout(header("timeout")).completionTimeout(2000)
+                        .to("mock:aggregated");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutFallbackTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutFallbackTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java?rev=911334&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java Thu Feb 18 11:27:39 2010
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateExpressionTimeoutPerGroupTest extends ContextTestSupport {
+
+    public void testAggregateExpressionPerGroupTimeout() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("G+H+I", "D+E+F", "A+B+C");
+
+        // will use fallback timeout (5 sec)
+        template.sendBodyAndHeader("direct:start", "A", "id", 789);
+        template.sendBodyAndHeader("direct:start", "B", "id", 789);
+        template.sendBodyAndHeader("direct:start", "C", "id", 789);
+
+        Map headers = new HashMap();
+        headers.put("id", 123);
+        headers.put("timeout", 3000);
+
+        // will use 3 sec timeout
+        template.sendBodyAndHeaders("direct:start", "D", headers);
+        template.sendBodyAndHeaders("direct:start", "E", headers);
+        template.sendBodyAndHeaders("direct:start", "F", headers);
+
+        Map headers2 = new HashMap();
+        headers2.put("id", 456);
+        headers2.put("timeout", 1000);
+
+        // will use 1 sec timeout
+        template.sendBodyAndHeaders("direct:start", "G", headers2);
+        template.sendBodyAndHeaders("direct:start", "H", headers2);
+        template.sendBodyAndHeaders("direct:start", "I", headers2);
+
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    // aggregate all exchanges correlated by the id header.
+                    // Aggregate them using the BodyInAggregatingStrategy strategy which
+                    // and the timeout header contains the timeout in millis of inactivity them timeout and complete the aggregation
+                    // and send it to mock:aggregated
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionTimeout(header("timeout")).completionTimeout(5000)
+                        .to("mock:aggregated");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java (from r911285, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java&r1=911285&r2=911334&rev=911334&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java Thu Feb 18 11:27:39 2010
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
@@ -23,14 +26,18 @@
 /**
  * @version $Revision$
  */
-public class AggregateSimpleTimeoutTest extends ContextTestSupport {
+public class AggregateExpressionTimeoutTest extends ContextTestSupport {
 
-    public void testAggregateSimpleTimeout() throws Exception {
+    public void testAggregateExpressionTimeout() throws Exception {
         getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
 
-        template.sendBodyAndHeader("direct:start", "A", "id", 123);
-        template.sendBodyAndHeader("direct:start", "B", "id", 123);
-        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        Map headers = new HashMap();
+        headers.put("id", 123);
+        headers.put("timeout", 2000);
+
+        template.sendBodyAndHeaders("direct:start", "A", headers);
+        template.sendBodyAndHeaders("direct:start", "B", headers);
+        template.sendBodyAndHeaders("direct:start", "C", headers);
 
         assertMockEndpointsSatisfied();
     }
@@ -44,12 +51,12 @@
                 from("direct:start")
                     // aggregate all exchanges correlated by the id header.
                     // Aggregate them using the BodyInAggregatingStrategy strategy which
-                    // and after 3 seconds of inactivity them timeout and complete the aggregation
+                    // and the timeout header contains the timeout in millis of inactivity them timeout and complete the aggregation
                     // and send it to mock:aggregated
-                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(3000)
+                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(header("timeout"))
                         .to("mock:aggregated");
                 // END SNIPPET: e1
             }
         };
     }
-}
+}
\ No newline at end of file

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java?rev=911334&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java Thu Feb 18 11:27:39 2010
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.aggregator.AggregateExpressionSizeFallbackTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version $Revision$
+ */
+public class SpringAggregateExpressionSizeFallbackTest extends AggregateExpressionSizeFallbackTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml");
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.java?rev=911334&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.java Thu Feb 18 11:27:39 2010
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.aggregator.AggregateExpressionSizeTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version $Revision$
+ */
+public class SpringAggregateExpressionSizeTest extends AggregateExpressionSizeTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml");
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.java?rev=911334&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.java Thu Feb 18 11:27:39 2010
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.aggregator.AggregateExpressionTimeoutFallbackTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version $Revision$
+ */
+public class SpringAggregateExpressionTimeoutFallbackTest extends AggregateExpressionTimeoutFallbackTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml");
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.java?rev=911334&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.java Thu Feb 18 11:27:39 2010
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.aggregator.AggregateExpressionTimeoutPerGroupTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version $Revision$
+ */
+public class SpringAggregateExpressionTimeoutPerGroupTest extends AggregateExpressionTimeoutPerGroupTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml");
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.java (from r911293, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java&r1=911293&r2=911334&rev=911334&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.java Thu Feb 18 11:27:39 2010
@@ -17,17 +17,17 @@
 package org.apache.camel.spring.processor.aggregator;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.aggregator.AggregateSimpleTimeoutTest;
+import org.apache.camel.processor.aggregator.AggregateExpressionTimeoutTest;
 
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 /**
  * @version $Revision$
  */
-public class SpringAggregateSimpleTimeoutTest extends AggregateSimpleTimeoutTest {
+public class SpringAggregateExpressionTimeoutTest extends AggregateExpressionTimeoutTest {
 
     protected CamelContext createCamelContext() throws Exception {
-        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml");
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.xml");
     }
 
-}
+}
\ No newline at end of file

Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml?rev=911334&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml Thu Feb 18 11:27:39 2010
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <!-- START SNIPPET: e1 -->
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <aggregate strategyRef="aggregatorStrategy" completionSize="3">
+                <correlationExpression>
+                    <simple>header.id</simple>
+                </correlationExpression>
+                <completionSize>
+                    <header>mySize</header>
+                </completionSize>
+                <to uri="mock:aggregated"/>
+            </aggregate>
+        </route>
+    </camelContext>
+
+    <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
+    <!-- END SNIPPET: e1 -->
+
+</beans>

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeFallbackTest.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml?rev=911334&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml Thu Feb 18 11:27:39 2010
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <!-- START SNIPPET: e1 -->
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <aggregate strategyRef="aggregatorStrategy">
+                <correlationExpression>
+                    <simple>header.id</simple>
+                </correlationExpression>
+                <completionSize>
+                    <header>mySize</header>
+                </completionSize>
+                <to uri="mock:aggregated"/>
+            </aggregate>
+        </route>
+    </camelContext>
+
+    <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
+    <!-- END SNIPPET: e1 -->
+
+</beans>

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml?rev=911334&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml Thu Feb 18 11:27:39 2010
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <!-- START SNIPPET: e1 -->
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <aggregate strategyRef="aggregatorStrategy" completionTimeout="2000">
+                <correlationExpression>
+                    <simple>header.id</simple>
+                </correlationExpression>
+                <completionTimeout>
+                    <header>timeout</header>
+                </completionTimeout>
+                <to uri="mock:aggregated"/>
+            </aggregate>
+        </route>
+    </camelContext>
+
+    <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
+    <!-- END SNIPPET: e1 -->
+
+</beans>

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutFallbackTest.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml?rev=911334&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml Thu Feb 18 11:27:39 2010
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <!-- START SNIPPET: e1 -->
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <aggregate strategyRef="aggregatorStrategy" completionTimeout="5000">
+                <correlationExpression>
+                    <simple>header.id</simple>
+                </correlationExpression>
+                <completionTimeout>
+                    <header>timeout</header>
+                </completionTimeout>
+                <to uri="mock:aggregated"/>
+            </aggregate>
+        </route>
+    </camelContext>
+
+    <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
+    <!-- END SNIPPET: e1 -->
+
+</beans>

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutPerGroupTest.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.xml (from r911293, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml&r1=911293&r2=911334&rev=911334&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.xml Thu Feb 18 11:27:39 2010
@@ -26,10 +26,13 @@
     <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
         <route>
             <from uri="direct:start"/>
-            <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000">
+            <aggregate strategyRef="aggregatorStrategy">
                 <correlationExpression>
                     <simple>header.id</simple>
                 </correlationExpression>
+                <completionTimeout>
+                    <header>timeout</header>
+                </completionTimeout>
                 <to uri="mock:aggregated"/>
             </aggregate>
         </route>