You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/17 11:11:07 UTC

svn commit: r910890 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/aggregate/ main/java/org/apache/camel/util/ test/java/org/apache/camel/processor/aggregator/

Author: davsclaus
Date: Wed Feb 17 10:11:06 2010
New Revision: 910890

URL: http://svn.apache.org/viewvc?rev=910890&view=rev
Log:
CAMEL-1686: Overhaul of aggregator. Work in progress.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Feb 17 10:11:06 2010
@@ -18,6 +18,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -55,6 +56,12 @@
     private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
+    @XmlTransient
+    private ExecutorService executorService;
+    @XmlAttribute(required = false)
+    private Boolean parallelProcessing;
+    @XmlAttribute(required = false)
+    private String executorServiceRef;
     @XmlAttribute(required = true)
     private String strategyRef;
     @XmlAttribute(required = false)
@@ -65,6 +72,12 @@
     private Boolean completionFromBatchConsumer;
     @XmlAttribute(required = false)
     private Boolean groupExchanges;
+    @XmlAttribute(required = false)
+    private Boolean eagerCheckCompletion;
+    @XmlAttribute(required = false)
+    private Boolean ignoreBadCorrelationKeys;
+    @XmlAttribute(required = false)
+    private Boolean closeCorrelationKeyOnCompletion;
 
     public AggregateDefinition() {
     }
@@ -127,11 +140,16 @@
 
         AggregateProcessor answer = new AggregateProcessor(processor, correlation, strategy);
 
+        ExecutorService executor = createExecutorService(routeContext);
+        answer.setExecutorService(executor);
+        if (isParallelProcessing() != null) {
+            answer.setParallelProcessing(isParallelProcessing());
+        }
+
         if (getCompletionPredicate() != null) {
             Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
             answer.setCompletionPredicate(predicate);
         }
-
         if (getCompletionSize() != null) {
             answer.setCompletionSize(getCompletionSize());
         }
@@ -141,6 +159,15 @@
         if (isCompletionFromBatchConsumer() != null) {
             answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
         }
+        if (isCloseCorrelationKeyOnCompletion() != null) {
+            answer.setCloseCorrelationKeyOnCompletion(isCloseCorrelationKeyOnCompletion());
+        }
+        if (isEagerCheckCompletion() != null) {
+            answer.setEagerCheckCompletion(isEagerCheckCompletion());
+        }
+        if (isIgnoreBadCorrelationKeys() != null) {
+            answer.setIgnoreBadCorrelationKeys(isIgnoreBadCorrelationKeys());
+        }
 
         return answer;
     }
@@ -160,6 +187,13 @@
         return strategy;
     }
 
+    private ExecutorService createExecutorService(RouteContext routeContext) {
+        if (executorService == null && executorServiceRef != null) {
+            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+        }
+        return executorService;
+    }
+
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }
@@ -216,10 +250,100 @@
         this.completionFromBatchConsumer = completionFromBatchConsumer;
     }
 
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public Boolean isParallelProcessing() {
+        return parallelProcessing;
+    }
+
+    public void setParallelProcessing(Boolean parallelProcessing) {
+        this.parallelProcessing = parallelProcessing;
+    }
+
+    public String getExecutorServiceRef() {
+        return executorServiceRef;
+    }
+
+    public void setExecutorServiceRef(String executorServiceRef) {
+        this.executorServiceRef = executorServiceRef;
+    }
+
+    public String getStrategyRef() {
+        return strategyRef;
+    }
+
+    public void setStrategyRef(String strategyRef) {
+        this.strategyRef = strategyRef;
+    }
+
+    public Boolean isEagerCheckCompletion() {
+        return eagerCheckCompletion;
+    }
+
+    public void setEagerCheckCompletion(Boolean eagerCheckCompletion) {
+        this.eagerCheckCompletion = eagerCheckCompletion;
+    }
+
+    public Boolean isIgnoreBadCorrelationKeys() {
+        return ignoreBadCorrelationKeys;
+    }
+
+    public void setIgnoreBadCorrelationKeys(Boolean ignoreBadCorrelationKeys) {
+        this.ignoreBadCorrelationKeys = ignoreBadCorrelationKeys;
+    }
+
+    public Boolean isCloseCorrelationKeyOnCompletion() {
+        return closeCorrelationKeyOnCompletion;
+    }
+
+    public void setCloseCorrelationKeyOnCompletion(Boolean closeCorrelationKeyOnCompletion) {
+        this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
+    }
+
     // Fluent API
     //-------------------------------------------------------------------------
 
     /**
+     * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
+     * At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
+     *
+     * @return builder
+     */
+    public AggregateDefinition eagerCheckCompletion() {
+        setEagerCheckCompletion(true);
+        return this;
+    }
+
+    /**
+     * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just
+     * ignore the incoming Exchange.
+     *
+     * @return builder
+     */
+    public AggregateDefinition ignoreBadCorrelationKeys() {
+        setIgnoreBadCorrelationKeys(true);
+        return this;
+    }
+
+    /**
+     * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key
+     * that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException}
+     * is thrown.
+     *
+     * @return builder
+     */
+    public AggregateDefinition closeCorrelationKeyOnCompletion() {
+        setCloseCorrelationKeyOnCompletion(true);
+        return this;
+    }
+
+    /**
      * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
      * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
      * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
@@ -313,6 +437,38 @@
         return this;
     }
 
+    /**
+     * Sending the aggregated output in parallel
+     *
+     * @return the builder
+     */
+    public AggregateDefinition parallelProcessing() {
+        setParallelProcessing(true);
+        return this;
+    }
+
+    /**
+     * Setting the executor service for executing the sending the aggregated output.
+     *
+     * @param executorService the executor service
+     * @return the builder
+     */
+    public AggregateDefinition executorService(ExecutorService executorService) {
+        setExecutorService(executorService);
+        return this;
+    }
+
+    /**
+     * Setting the executor service for executing the sending the aggregated output.
+     *
+     * @param executorServiceRef reference to the executor service
+     * @return the builder
+     */
+    public AggregateDefinition executorServiceRef(String executorServiceRef) {
+        setExecutorServiceRef(executorServiceRef);
+        return this;
+    }
+
     protected void checkNoCompletedPredicate() {
         if (getCompletionPredicate() != null) {
             throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Wed Feb 17 10:11:06 2010
@@ -151,7 +151,7 @@
     }
 
     /**
-     * Doing the splitting work in parallel
+     * Doing the recipient list work in parallel
      *
      * @return the builder
      */
@@ -184,6 +184,17 @@
         return this;
     }
 
+    /**
+     * Setting the executor service for executing the sending to the recipients.
+     *
+     * @param executorServiceRef reference to the executor service
+     * @return the builder
+     */
+    public RecipientListDefinition executorServiceRef(String executorServiceRef) {
+        setExecutorServiceRef(executorServiceRef);
+        return this;
+    }
+
     // Properties
     //-------------------------------------------------------------------------
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Feb 17 10:11:06 2010
@@ -62,23 +62,21 @@
  */
 public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
 
-    // TODO: Add support for parallelProcessing, setting custom ExecutorService like multicast
-
     private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
 
-    private TimeoutMap<Object, Exchange> timeoutMap;
     private final Processor processor;
     private final AggregationStrategy aggregationStrategy;
     private final Expression correlationExpression;
+    private TimeoutMap<Object, Exchange> timeoutMap;
     private ExecutorService executorService;
+    private ExceptionHandler exceptionHandler;
     private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
     private Set<Object> closedCorrelationKeys = new HashSet<Object>();
-    private ExceptionHandler exceptionHandler;
 
     // options
     private boolean ignoreBadCorrelationKeys;
     private boolean closeCorrelationKeyOnCompletion;
-    private int concurrentConsumers = 1;
+    private boolean parallelProcessing;
 
     // different ways to have completion triggered
     private boolean eagerCheckCompletion;
@@ -137,7 +135,7 @@
         // is the correlation key closed?
         if (isCloseCorrelationKeyOnCompletion()) {
             if (closedCorrelationKeys.contains(key)) {
-                throw new CamelExchangeException("Correlation key has been closed", exchange);
+                throw new ClosedCorrelationKeyException(key, exchange);
             }
         }
 
@@ -344,14 +342,6 @@
         this.completionFromBatchConsumer = completionFromBatchConsumer;
     }
 
-    public int getConcurrentConsumers() {
-        return concurrentConsumers;
-    }
-
-    public void setConcurrentConsumers(int concurrentConsumers) {
-        this.concurrentConsumers = concurrentConsumers;
-    }
-
     public ExceptionHandler getExceptionHandler() {
         if (exceptionHandler == null) {
             exceptionHandler = new LoggingExceptionHandler(getClass());
@@ -363,6 +353,14 @@
         this.exceptionHandler = exceptionHandler;
     }
 
+    public boolean isParallelProcessing() {
+        return parallelProcessing;
+    }
+
+    public void setParallelProcessing(boolean parallelProcessing) {
+        this.parallelProcessing = parallelProcessing;
+    }
+
     /**
      * Background tasks that looks for aggregated exchanges which is triggered by completion timeouts.
      */
@@ -391,12 +389,18 @@
         ServiceHelper.startService(aggregationRepository);
 
         if (executorService == null) {
-            executorService = ExecutorServiceHelper.newFixedThreadPool(getConcurrentConsumers(), "AggregateProcessor", true);
+            if (isParallelProcessing()) {
+                // we are running in parallel so create a default thread pool
+                executorService = ExecutorServiceHelper.newFixedThreadPool(10, "Aggregator", true);
+            } else {
+                // use a single threaded if we are not running in parallel
+                executorService = ExecutorServiceHelper.newFixedThreadPool(1, "Aggregator", true);
+            }
         }
 
         // start timeout service if its in use
         if (getCompletionTimeout() > 0) {
-            ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateProcessorTimeoutCompletion", true);
+            ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", true);
             timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
             ServiceHelper.startService(timeoutMap);
         }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+
+/**
+ * The correlation key has been closed and the Exchange cannot be aggregated.
+ *
+ * @version $Revision$
+ */
+public class ClosedCorrelationKeyException extends CamelExchangeException {
+
+    // the correlation key is not expected to be serialized
+    private transient Object correlationKey;
+
+    public ClosedCorrelationKeyException(Object correlationKey, Exchange exchange) {
+        super("The correlation key [" + correlationKey + "] has been closed", exchange);
+        this.correlationKey = correlationKey;
+    }
+
+    public ClosedCorrelationKeyException(Object correlationKey, Exchange exchange, Throwable cause) {
+        super("The correlation key [" + correlationKey + "] has been closed", exchange, cause);
+        this.correlationKey = correlationKey;
+    }
+
+    public Object getCorrelationKey() {
+        return correlationKey;
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Wed Feb 17 10:11:06 2010
@@ -123,7 +123,7 @@
 
     public void purge() {
         if (log.isTraceEnabled()) {
-            log.debug("There are " + map.size() + " in the timeout map");
+            log.trace("There are " + map.size() + " in the timeout map");
         }
         long now = currentTime();
 

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateClosedCorrelationKeyTest extends ContextTestSupport {
+
+    public void testAggregateClosedCorrelationKey() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+B");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 1);
+        template.sendBodyAndHeader("direct:start", "B", "id", 1);
+
+        // should be closed
+        try {
+            template.sendBodyAndHeader("direct:start", "C", "id", 1);
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            ClosedCorrelationKeyException cause = assertIsInstanceOf(ClosedCorrelationKeyException.class, e.getCause());
+            assertEquals(1, cause.getCorrelationKey());
+            assertEquals("The correlation key [1] has been closed. Exchange[Message: C]", cause.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionSize(2).closeCorrelationKeyOnCompletion()
+                        .to("mock:result");
+
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateEagerCheckCompletionTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testAggregateEagerCheckCompletion() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionPredicate(body().isEqualTo("END")).eagerCheckCompletion()
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+B+END");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 1);
+        template.sendBodyAndHeader("direct:start", "B", "id", 1);
+        template.sendBodyAndHeader("direct:start", "END", "id", 1);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testAggregateNotEagerCheckCompletion() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionPredicate(body().isEqualTo("A+B+END"))
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+B+END");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 1);
+        template.sendBodyAndHeader("direct:start", "B", "id", 1);
+        template.sendBodyAndHeader("direct:start", "END", "id", 1);
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateIgnoreBadCorrelationKeysTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testAggregateIgnoreBadCorrelationKeys() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionSize(2).ignoreBadCorrelationKeys()
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+C");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 1);
+
+        // B should be ignored
+        template.sendBodyAndHeader("direct:start", "B", "id", null);
+
+        template.sendBodyAndHeader("direct:start", "C", "id", 1);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testAggregateNotIgnoreBadCorrelationKeys() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionSize(2)
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+C");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 1);
+
+        try {
+            template.sendBodyAndHeader("direct:start", "B", "id", null);
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
+            assertEquals("Correlation key could not be evaluated to a value. Exchange[Message: B]", cause.getMessage());
+        }
+
+        template.sendBodyAndHeader("direct:start", "C", "id", 1);
+
+        assertMockEndpointsSatisfied();
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateParallelProcessingTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testAggregateParallelProcessing() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .eagerCheckCompletion().completionPredicate(body().isEqualTo("END")).parallelProcessing()
+                        .to("direct:cool");
+
+                from("direct:cool")
+                    .to("mock:cool")
+                    .choice()
+                        .when(body().contains("Camel")).to("mock:result")
+                        .when(body().contains("Donkey")).delay(2000).to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:cool").expectedBodiesReceivedInAnyOrder("B+Camel+END", "A+Donkey+END");
+        getMockEndpoint("mock:result").expectedBodiesReceived("B+Camel+END", "A+Donkey+END");
+
+        // donkey is maybe first but Camel will arrive first at mock
+        template.sendBodyAndHeader("direct:start", "A", "id", 1);
+        template.sendBodyAndHeader("direct:start", "Donkey", "id", 1);
+        template.sendBodyAndHeader("direct:start", "END", "id", 1);
+
+        template.sendBodyAndHeader("direct:start", "B", "id", 2);
+        template.sendBodyAndHeader("direct:start", "Camel", "id", 2);
+        template.sendBodyAndHeader("direct:start", "END", "id", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testAggregateNotParallelProcessing() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .eagerCheckCompletion().completionPredicate(body().isEqualTo("END"))
+                        .to("direct:cool");
+
+                from("direct:cool")
+                    .to("mock:cool")
+                    .choice()
+                        .when(body().contains("Camel")).to("mock:result")
+                        .when(body().contains("Donkey")).delay(2000).to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:cool").expectedBodiesReceived("A+Donkey+END", "B+Camel+END");
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+Donkey+END", "B+Camel+END");
+
+        // donkey is first as we do NOT run in parallel
+        template.sendBodyAndHeader("direct:start", "A", "id", 1);
+        template.sendBodyAndHeader("direct:start", "Donkey", "id", 1);
+        template.sendBodyAndHeader("direct:start", "END", "id", 1);
+
+        template.sendBodyAndHeader("direct:start", "B", "id", 2);
+        template.sendBodyAndHeader("direct:start", "Camel", "id", 2);
+        template.sendBodyAndHeader("direct:start", "END", "id", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Wed Feb 17 10:11:06 2010
@@ -348,7 +348,7 @@
             ap.process(e4);
             fail("Should have thrown an exception");
         } catch (CamelExchangeException e) {
-            assertEquals("Correlation key has been closed. Exchange[Message: C]", e.getMessage());
+            assertEquals("The correlation key [123] has been closed. Exchange[Message: C]", e.getMessage());
         }
 
         assertMockEndpointsSatisfied();
@@ -498,5 +498,4 @@
         ap.stop();
     }
 
-
 }