You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/17 11:11:07 UTC
svn commit: r910890 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/model/
main/java/org/apache/camel/processor/aggregate/
main/java/org/apache/camel/util/
test/java/org/apache/camel/processor/aggregator/
Author: davsclaus
Date: Wed Feb 17 10:11:06 2010
New Revision: 910890
URL: http://svn.apache.org/viewvc?rev=910890&view=rev
Log:
CAMEL-1686: Overhaul of aggregator. Work in progress.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Feb 17 10:11:06 2010
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -55,6 +56,12 @@
private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
@XmlTransient
private AggregationStrategy aggregationStrategy;
+ @XmlTransient
+ private ExecutorService executorService;
+ @XmlAttribute(required = false)
+ private Boolean parallelProcessing;
+ @XmlAttribute(required = false)
+ private String executorServiceRef;
@XmlAttribute(required = true)
private String strategyRef;
@XmlAttribute(required = false)
@@ -65,6 +72,12 @@
private Boolean completionFromBatchConsumer;
@XmlAttribute(required = false)
private Boolean groupExchanges;
+ @XmlAttribute(required = false)
+ private Boolean eagerCheckCompletion;
+ @XmlAttribute(required = false)
+ private Boolean ignoreBadCorrelationKeys;
+ @XmlAttribute(required = false)
+ private Boolean closeCorrelationKeyOnCompletion;
public AggregateDefinition() {
}
@@ -127,11 +140,16 @@
AggregateProcessor answer = new AggregateProcessor(processor, correlation, strategy);
+ ExecutorService executor = createExecutorService(routeContext);
+ answer.setExecutorService(executor);
+ if (isParallelProcessing() != null) {
+ answer.setParallelProcessing(isParallelProcessing());
+ }
+
if (getCompletionPredicate() != null) {
Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
answer.setCompletionPredicate(predicate);
}
-
if (getCompletionSize() != null) {
answer.setCompletionSize(getCompletionSize());
}
@@ -141,6 +159,15 @@
if (isCompletionFromBatchConsumer() != null) {
answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
}
+ if (isCloseCorrelationKeyOnCompletion() != null) {
+ answer.setCloseCorrelationKeyOnCompletion(isCloseCorrelationKeyOnCompletion());
+ }
+ if (isEagerCheckCompletion() != null) {
+ answer.setEagerCheckCompletion(isEagerCheckCompletion());
+ }
+ if (isIgnoreBadCorrelationKeys() != null) {
+ answer.setIgnoreBadCorrelationKeys(isIgnoreBadCorrelationKeys());
+ }
return answer;
}
@@ -160,6 +187,13 @@
return strategy;
}
+ private ExecutorService createExecutorService(RouteContext routeContext) {
+ if (executorService == null && executorServiceRef != null) {
+ executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+ }
+ return executorService;
+ }
+
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
@@ -216,10 +250,100 @@
this.completionFromBatchConsumer = completionFromBatchConsumer;
}
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public Boolean isParallelProcessing() {
+ return parallelProcessing;
+ }
+
+ public void setParallelProcessing(Boolean parallelProcessing) {
+ this.parallelProcessing = parallelProcessing;
+ }
+
+ public String getExecutorServiceRef() {
+ return executorServiceRef;
+ }
+
+ public void setExecutorServiceRef(String executorServiceRef) {
+ this.executorServiceRef = executorServiceRef;
+ }
+
+ public String getStrategyRef() {
+ return strategyRef;
+ }
+
+ public void setStrategyRef(String strategyRef) {
+ this.strategyRef = strategyRef;
+ }
+
+ public Boolean isEagerCheckCompletion() {
+ return eagerCheckCompletion;
+ }
+
+ public void setEagerCheckCompletion(Boolean eagerCheckCompletion) {
+ this.eagerCheckCompletion = eagerCheckCompletion;
+ }
+
+ public Boolean isIgnoreBadCorrelationKeys() {
+ return ignoreBadCorrelationKeys;
+ }
+
+ public void setIgnoreBadCorrelationKeys(Boolean ignoreBadCorrelationKeys) {
+ this.ignoreBadCorrelationKeys = ignoreBadCorrelationKeys;
+ }
+
+ public Boolean isCloseCorrelationKeyOnCompletion() {
+ return closeCorrelationKeyOnCompletion;
+ }
+
+ public void setCloseCorrelationKeyOnCompletion(Boolean closeCorrelationKeyOnCompletion) {
+ this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
+ }
+
// Fluent API
//-------------------------------------------------------------------------
/**
+ * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
+ * At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
+ *
+ * @return builder
+ */
+ public AggregateDefinition eagerCheckCompletion() {
+ setEagerCheckCompletion(true);
+ return this;
+ }
+
+ /**
+ * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just
+ * ignore the incoming Exchange.
+ *
+ * @return builder
+ */
+ public AggregateDefinition ignoreBadCorrelationKeys() {
+ setIgnoreBadCorrelationKeys(true);
+ return this;
+ }
+
+ /**
+ * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key
+ * that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException}
+ * is thrown.
+ *
+ * @return builder
+ */
+ public AggregateDefinition closeCorrelationKeyOnCompletion() {
+ setCloseCorrelationKeyOnCompletion(true);
+ return this;
+ }
+
+ /**
* Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
* and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
* as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
@@ -313,6 +437,38 @@
return this;
}
+ /**
+ * Sending the aggregated output in parallel
+ *
+ * @return the builder
+ */
+ public AggregateDefinition parallelProcessing() {
+ setParallelProcessing(true);
+ return this;
+ }
+
+ /**
+ * Setting the executor service for executing the sending the aggregated output.
+ *
+ * @param executorService the executor service
+ * @return the builder
+ */
+ public AggregateDefinition executorService(ExecutorService executorService) {
+ setExecutorService(executorService);
+ return this;
+ }
+
+ /**
+ * Setting the executor service for executing the sending the aggregated output.
+ *
+ * @param executorServiceRef reference to the executor service
+ * @return the builder
+ */
+ public AggregateDefinition executorServiceRef(String executorServiceRef) {
+ setExecutorServiceRef(executorServiceRef);
+ return this;
+ }
+
protected void checkNoCompletedPredicate() {
if (getCompletionPredicate() != null) {
throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Wed Feb 17 10:11:06 2010
@@ -151,7 +151,7 @@
}
/**
- * Doing the splitting work in parallel
+ * Doing the recipient list work in parallel
*
* @return the builder
*/
@@ -184,6 +184,17 @@
return this;
}
+ /**
+ * Setting the executor service for executing the sending to the recipients.
+ *
+ * @param executorServiceRef reference to the executor service
+ * @return the builder
+ */
+ public RecipientListDefinition executorServiceRef(String executorServiceRef) {
+ setExecutorServiceRef(executorServiceRef);
+ return this;
+ }
+
// Properties
//-------------------------------------------------------------------------
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Feb 17 10:11:06 2010
@@ -62,23 +62,21 @@
*/
public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
- // TODO: Add support for parallelProcessing, setting custom ExecutorService like multicast
-
private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
- private TimeoutMap<Object, Exchange> timeoutMap;
private final Processor processor;
private final AggregationStrategy aggregationStrategy;
private final Expression correlationExpression;
+ private TimeoutMap<Object, Exchange> timeoutMap;
private ExecutorService executorService;
+ private ExceptionHandler exceptionHandler;
private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
private Set<Object> closedCorrelationKeys = new HashSet<Object>();
- private ExceptionHandler exceptionHandler;
// options
private boolean ignoreBadCorrelationKeys;
private boolean closeCorrelationKeyOnCompletion;
- private int concurrentConsumers = 1;
+ private boolean parallelProcessing;
// different ways to have completion triggered
private boolean eagerCheckCompletion;
@@ -137,7 +135,7 @@
// is the correlation key closed?
if (isCloseCorrelationKeyOnCompletion()) {
if (closedCorrelationKeys.contains(key)) {
- throw new CamelExchangeException("Correlation key has been closed", exchange);
+ throw new ClosedCorrelationKeyException(key, exchange);
}
}
@@ -344,14 +342,6 @@
this.completionFromBatchConsumer = completionFromBatchConsumer;
}
- public int getConcurrentConsumers() {
- return concurrentConsumers;
- }
-
- public void setConcurrentConsumers(int concurrentConsumers) {
- this.concurrentConsumers = concurrentConsumers;
- }
-
public ExceptionHandler getExceptionHandler() {
if (exceptionHandler == null) {
exceptionHandler = new LoggingExceptionHandler(getClass());
@@ -363,6 +353,14 @@
this.exceptionHandler = exceptionHandler;
}
+ public boolean isParallelProcessing() {
+ return parallelProcessing;
+ }
+
+ public void setParallelProcessing(boolean parallelProcessing) {
+ this.parallelProcessing = parallelProcessing;
+ }
+
/**
* Background tasks that looks for aggregated exchanges which is triggered by completion timeouts.
*/
@@ -391,12 +389,18 @@
ServiceHelper.startService(aggregationRepository);
if (executorService == null) {
- executorService = ExecutorServiceHelper.newFixedThreadPool(getConcurrentConsumers(), "AggregateProcessor", true);
+ if (isParallelProcessing()) {
+ // we are running in parallel so create a default thread pool
+ executorService = ExecutorServiceHelper.newFixedThreadPool(10, "Aggregator", true);
+ } else {
+ // use a single threaded if we are not running in parallel
+ executorService = ExecutorServiceHelper.newFixedThreadPool(1, "Aggregator", true);
+ }
}
// start timeout service if its in use
if (getCompletionTimeout() > 0) {
- ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateProcessorTimeoutCompletion", true);
+ ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", true);
timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
ServiceHelper.startService(timeoutMap);
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+
+/**
+ * The correlation key has been closed and the Exchange cannot be aggregated.
+ *
+ * @version $Revision$
+ */
+public class ClosedCorrelationKeyException extends CamelExchangeException {
+
+ // the correlation key is not expected to be serialized
+ private transient Object correlationKey;
+
+ public ClosedCorrelationKeyException(Object correlationKey, Exchange exchange) {
+ super("The correlation key [" + correlationKey + "] has been closed", exchange);
+ this.correlationKey = correlationKey;
+ }
+
+ public ClosedCorrelationKeyException(Object correlationKey, Exchange exchange, Throwable cause) {
+ super("The correlation key [" + correlationKey + "] has been closed", exchange, cause);
+ this.correlationKey = correlationKey;
+ }
+
+ public Object getCorrelationKey() {
+ return correlationKey;
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/ClosedCorrelationKeyException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Wed Feb 17 10:11:06 2010
@@ -123,7 +123,7 @@
public void purge() {
if (log.isTraceEnabled()) {
- log.debug("There are " + map.size() + " in the timeout map");
+ log.trace("There are " + map.size() + " in the timeout map");
}
long now = currentTime();
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateClosedCorrelationKeyTest extends ContextTestSupport {
+
+ public void testAggregateClosedCorrelationKey() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("A+B");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 1);
+ template.sendBodyAndHeader("direct:start", "B", "id", 1);
+
+ // should be closed
+ try {
+ template.sendBodyAndHeader("direct:start", "C", "id", 1);
+ fail("Should throw an exception");
+ } catch (CamelExecutionException e) {
+ ClosedCorrelationKeyException cause = assertIsInstanceOf(ClosedCorrelationKeyException.class, e.getCause());
+ assertEquals(1, cause.getCorrelationKey());
+ assertEquals("The correlation key [1] has been closed. Exchange[Message: C]", cause.getMessage());
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionSize(2).closeCorrelationKeyOnCompletion()
+ .to("mock:result");
+
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateEagerCheckCompletionTest extends ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testAggregateEagerCheckCompletion() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionPredicate(body().isEqualTo("END")).eagerCheckCompletion()
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("A+B+END");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 1);
+ template.sendBodyAndHeader("direct:start", "B", "id", 1);
+ template.sendBodyAndHeader("direct:start", "END", "id", 1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testAggregateNotEagerCheckCompletion() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionPredicate(body().isEqualTo("A+B+END"))
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("A+B+END");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 1);
+ template.sendBodyAndHeader("direct:start", "B", "id", 1);
+ template.sendBodyAndHeader("direct:start", "END", "id", 1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateIgnoreBadCorrelationKeysTest extends ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testAggregateIgnoreBadCorrelationKeys() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionSize(2).ignoreBadCorrelationKeys()
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("A+C");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 1);
+
+ // B should be ignored
+ template.sendBodyAndHeader("direct:start", "B", "id", null);
+
+ template.sendBodyAndHeader("direct:start", "C", "id", 1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testAggregateNotIgnoreBadCorrelationKeys() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionSize(2)
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("A+C");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 1);
+
+ try {
+ template.sendBodyAndHeader("direct:start", "B", "id", null);
+ fail("Should throw an exception");
+ } catch (CamelExecutionException e) {
+ CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
+ assertEquals("Correlation key could not be evaluated to a value. Exchange[Message: B]", cause.getMessage());
+ }
+
+ template.sendBodyAndHeader("direct:start", "C", "id", 1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateIgnoreBadCorrelationKeysTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java?rev=910890&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java Wed Feb 17 10:11:06 2010
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateParallelProcessingTest extends ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testAggregateParallelProcessing() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .eagerCheckCompletion().completionPredicate(body().isEqualTo("END")).parallelProcessing()
+ .to("direct:cool");
+
+ from("direct:cool")
+ .to("mock:cool")
+ .choice()
+ .when(body().contains("Camel")).to("mock:result")
+ .when(body().contains("Donkey")).delay(2000).to("mock:result");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:cool").expectedBodiesReceivedInAnyOrder("B+Camel+END", "A+Donkey+END");
+ getMockEndpoint("mock:result").expectedBodiesReceived("B+Camel+END", "A+Donkey+END");
+
+ // donkey is maybe first but Camel will arrive first at mock
+ template.sendBodyAndHeader("direct:start", "A", "id", 1);
+ template.sendBodyAndHeader("direct:start", "Donkey", "id", 1);
+ template.sendBodyAndHeader("direct:start", "END", "id", 1);
+
+ template.sendBodyAndHeader("direct:start", "B", "id", 2);
+ template.sendBodyAndHeader("direct:start", "Camel", "id", 2);
+ template.sendBodyAndHeader("direct:start", "END", "id", 2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testAggregateNotParallelProcessing() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .eagerCheckCompletion().completionPredicate(body().isEqualTo("END"))
+ .to("direct:cool");
+
+ from("direct:cool")
+ .to("mock:cool")
+ .choice()
+ .when(body().contains("Camel")).to("mock:result")
+ .when(body().contains("Donkey")).delay(2000).to("mock:result");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:cool").expectedBodiesReceived("A+Donkey+END", "B+Camel+END");
+ getMockEndpoint("mock:result").expectedBodiesReceived("A+Donkey+END", "B+Camel+END");
+
+ // donkey is first as we do NOT run in parallel
+ template.sendBodyAndHeader("direct:start", "A", "id", 1);
+ template.sendBodyAndHeader("direct:start", "Donkey", "id", 1);
+ template.sendBodyAndHeader("direct:start", "END", "id", 1);
+
+ template.sendBodyAndHeader("direct:start", "B", "id", 2);
+ template.sendBodyAndHeader("direct:start", "Camel", "id", 2);
+ template.sendBodyAndHeader("direct:start", "END", "id", 2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910890&r1=910889&r2=910890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Wed Feb 17 10:11:06 2010
@@ -348,7 +348,7 @@
ap.process(e4);
fail("Should have thrown an exception");
} catch (CamelExchangeException e) {
- assertEquals("Correlation key has been closed. Exchange[Message: C]", e.getMessage());
+ assertEquals("The correlation key [123] has been closed. Exchange[Message: C]", e.getMessage());
}
assertMockEndpointsSatisfied();
@@ -498,5 +498,4 @@
ap.stop();
}
-
}