You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/17 09:27:27 UTC
svn commit: r910860 [1/2] - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/processor/aggre...
Author: davsclaus
Date: Wed Feb 17 08:27:25 2010
New Revision: 910860
URL: http://svn.apache.org/viewvc?rev=910860&view=rev
Log:
CAMEL-1686: Overhaul of Aggregator. Work in progress.
Removed:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAggregatorRejectedPutBackTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyReverseAggregationCollection.java
camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedCollection.java
camel/trunk/components/camel-rss/src/test/java/org/apache/camel/component/rss/RssCustomAggregatorTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorWithCustomCollectionTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator-custom-collection.xml
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/multicastAggregator.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/view/DotViewTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator-custom-strategy.xml
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/scattergather/scatter-gather.xml
camel/trunk/components/camel-web/src/main/java/org/apache/camel/web/util/AggregateDefinitionRenderer.java
camel/trunk/components/camel-web/src/test/java/org/apache/camel/web/groovy/AggregateDSLTest.java
camel/trunk/examples/camel-example-cafe/src/main/java/org/apache/camel/example/cafe/CafeRouteBuilder.java
camel/trunk/examples/camel-example-cafe/src/main/resources/META-INF/spring/camel-context.xml
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Wed Feb 17 08:27:25 2010
@@ -150,10 +150,6 @@
}
try {
- // now lets dispatch
- if (LOG.isDebugEnabled()) {
- LOG.debug(">>>> " + endpoint + " " + exchange);
- }
// invoke the callback
return callback.doInProducer(producer, exchange, pattern);
} finally {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Feb 17 08:27:25 2010
@@ -18,7 +18,6 @@
import java.util.ArrayList;
import java.util.List;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -32,11 +31,10 @@
import org.apache.camel.Processor;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.model.language.ExpressionDefinition;
-import org.apache.camel.processor.Aggregator;
-import org.apache.camel.processor.aggregate.AggregationCollection;
+import org.apache.camel.processor.UnitOfWorkProcessor;
+import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
-import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spi.RouteContext;
/**
@@ -47,32 +45,26 @@
@XmlRootElement(name = "aggregate")
@XmlAccessorType(XmlAccessType.FIELD)
public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> {
- @XmlElement(name = "correlationExpression", required = false)
+ @XmlElement(name = "correlationExpression", required = true)
private ExpressionSubElementDefinition correlationExpression;
+ @XmlElement(name = "completionPredicate", required = false)
+ private ExpressionSubElementDefinition completionPredicate;
@XmlTransient
private ExpressionDefinition expression;
@XmlElementRef
private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
@XmlTransient
private AggregationStrategy aggregationStrategy;
- @XmlTransient
- private AggregationCollection aggregationCollection;
- @XmlAttribute(required = false)
- private Integer batchSize;
- @XmlAttribute(required = false)
- private Integer outBatchSize;
+ @XmlAttribute(required = true)
+ private String strategyRef;
@XmlAttribute(required = false)
- private Long batchTimeout;
+ private Integer completionSize;
@XmlAttribute(required = false)
- private String strategyRef;
+ private Long completionTimeout;
@XmlAttribute(required = false)
- private String collectionRef;
+ private Boolean completionFromBatchConsumer;
@XmlAttribute(required = false)
private Boolean groupExchanges;
- @XmlAttribute(required = false)
- private Boolean batchSizeFromConsumer;
- @XmlElement(name = "completionPredicate", required = false)
- private ExpressionSubElementDefinition completionPredicate;
public AggregateDefinition() {
}
@@ -125,65 +117,32 @@
return clause;
}
- protected Aggregator createAggregator(RouteContext routeContext) throws Exception {
- final Processor processor = routeContext.createProcessor(this);
+ protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception {
+ Processor processor = routeContext.createProcessor(this);
+ // wrap the aggregated route in a unit of work processor
+ processor = new UnitOfWorkProcessor(routeContext, processor);
+
+ Expression correlation = getExpression().createExpression(routeContext);
+ AggregationStrategy strategy = createAggregationStrategy(routeContext);
+
+ AggregateProcessor answer = new AggregateProcessor(processor, correlation, strategy);
+
+ if (getCompletionPredicate() != null) {
+ Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
+ answer.setCompletionPredicate(predicate);
+ }
- final Aggregator aggregator;
- if (getAggregationCollection() == null) {
- setAggregationCollection(createAggregationCollection(routeContext));
- }
-
- if (aggregationCollection != null) {
- // create the aggregator using the collection
- // pre configure the collection if its expression and strategy is not set, then
- // use the ones that is pre configured with this type
- if (aggregationCollection.getCorrelationExpression() == null) {
- aggregationCollection.setCorrelationExpression(getExpression());
- }
- if (aggregationCollection.getAggregationStrategy() == null) {
- AggregationStrategy strategy = createAggregationStrategy(routeContext);
- aggregationCollection.setAggregationStrategy(strategy);
- }
- aggregator = new Aggregator(processor, aggregationCollection);
- } else {
- // create the aggregator using a default collection
- AggregationStrategy strategy = createAggregationStrategy(routeContext);
-
- if (getExpression() == null) {
- throw new IllegalArgumentException("You need to specify an expression or "
- + "aggregation collection for this aggregator: " + this);
- }
-
- Expression aggregateExpression = getExpression().createExpression(routeContext);
-
- Predicate predicate = null;
- if (getCompletionPredicate() != null) {
- predicate = getCompletionPredicate().createPredicate(routeContext);
- }
- if (predicate != null) {
- aggregator = new Aggregator(processor, aggregateExpression, strategy, predicate);
- } else {
- aggregator = new Aggregator(processor, aggregateExpression, strategy);
- }
- }
-
- if (batchSize != null) {
- aggregator.setBatchSize(batchSize);
- }
- if (batchTimeout != null) {
- aggregator.setBatchTimeout(batchTimeout);
- }
- if (outBatchSize != null) {
- aggregator.setOutBatchSize(outBatchSize);
+ if (getCompletionSize() != null) {
+ answer.setCompletionSize(getCompletionSize());
}
- if (groupExchanges != null) {
- aggregator.setGroupExchanges(groupExchanges);
+ if (getCompletionTimeout() != null) {
+ answer.setCompletionTimeout(getCompletionTimeout());
}
- if (batchSizeFromConsumer != null) {
- aggregator.setBatchConsumer(batchSizeFromConsumer);
+ if (isCompletionFromBatchConsumer() != null) {
+ answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
}
- return aggregator;
+ return answer;
}
private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
@@ -191,35 +150,16 @@
if (strategy == null && strategyRef != null) {
strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
}
- // pick a default strategy
+ if (strategy == null && groupExchanges != null && groupExchanges) {
+ // if grouped exchange is enabled then use special strategy for that
+ strategy = new GroupedExchangeAggregationStrategy();
+ }
if (strategy == null) {
- if (groupExchanges != null && groupExchanges) {
- // if grouped exchange is enabled then use special strategy for that
- strategy = new GroupedExchangeAggregationStrategy();
- } else {
- // fallback to use latest
- strategy = new UseLatestAggregationStrategy();
- }
+ throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
}
return strategy;
}
- private AggregationCollection createAggregationCollection(RouteContext routeContext) {
- AggregationCollection collection = getAggregationCollection();
- if (collection == null && collectionRef != null) {
- collection = routeContext.lookup(collectionRef, AggregationCollection.class);
- }
- return collection;
- }
-
- public AggregationCollection getAggregationCollection() {
- return aggregationCollection;
- }
-
- public void setAggregationCollection(AggregationCollection aggregationCollection) {
- this.aggregationCollection = aggregationCollection;
- }
-
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
@@ -228,55 +168,39 @@
this.aggregationStrategy = aggregationStrategy;
}
- public Integer getBatchSize() {
- return batchSize;
- }
-
- public void setBatchSize(Integer batchSize) {
- this.batchSize = batchSize;
- }
-
- public Integer getOutBatchSize() {
- return outBatchSize;
- }
-
- public void setOutBatchSize(Integer outBatchSize) {
- this.outBatchSize = outBatchSize;
+ public String getAggregationStrategyRef() {
+ return strategyRef;
}
- public Long getBatchTimeout() {
- return batchTimeout;
+ public void setAggregationStrategyRef(String aggregationStrategyRef) {
+ this.strategyRef = aggregationStrategyRef;
}
- public void setBatchTimeout(Long batchTimeout) {
- this.batchTimeout = batchTimeout;
+ public Integer getCompletionSize() {
+ return completionSize;
}
- public String getStrategyRef() {
- return strategyRef;
+ public void setCompletionSize(Integer completionSize) {
+ this.completionSize = completionSize;
}
- public void setStrategyRef(String strategyRef) {
- this.strategyRef = strategyRef;
+ public Long getCompletionTimeout() {
+ return completionTimeout;
}
- public String getCollectionRef() {
- return collectionRef;
+ public void setCompletionTimeout(Long completionTimeout) {
+ this.completionTimeout = completionTimeout;
}
- public void setCollectionRef(String collectionRef) {
- this.collectionRef = collectionRef;
+ public ExpressionSubElementDefinition getCompletionPredicate() {
+ return completionPredicate;
}
public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
this.completionPredicate = completionPredicate;
}
- public ExpressionSubElementDefinition getCompletionPredicate() {
- return completionPredicate;
- }
-
- public Boolean getGroupExchanges() {
+ public Boolean isGroupExchanges() {
return groupExchanges;
}
@@ -284,12 +208,12 @@
this.groupExchanges = groupExchanges;
}
- public Boolean getBatchSizeFromConsumer() {
- return batchSizeFromConsumer;
+ public Boolean isCompletionFromBatchConsumer() {
+ return completionFromBatchConsumer;
}
- public void setBatchSizeFromConsumer(Boolean batchSizeFromConsumer) {
- this.batchSizeFromConsumer = batchSizeFromConsumer;
+ public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) {
+ this.completionFromBatchConsumer = completionFromBatchConsumer;
}
// Fluent API
@@ -298,56 +222,36 @@
/**
* Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
* and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
- * as total by setting the exchange property {@link org.apache.camel.Exchange#BATCH_SIZE}.
- *
- * @return builder
- */
- public AggregateDefinition batchSizeFromConsumer() {
- setBatchSizeFromConsumer(true);
- return this;
- }
-
- /**
- * Sets the in batch size for number of exchanges received
+ * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
*
- * @param batchSize the batch size
* @return builder
*/
- public AggregateDefinition batchSize(int batchSize) {
- setBatchSize(batchSize);
+ public AggregateDefinition completionFromBatchConsumer() {
+ setCompletionFromBatchConsumer(true);
return this;
}
/**
- * Sets the out batch size for number of exchanges sent
+ * Sets the completion size, which is the number of aggregated exchanges which would
+ * cause the aggregate to consider the group as complete and send out the aggregated exchange.
*
- * @param batchSize the batch size
+ * @param completionSize the completion size
* @return builder
*/
- public AggregateDefinition outBatchSize(int batchSize) {
- setOutBatchSize(batchSize);
+ public AggregateDefinition completionSize(int completionSize) {
+ setCompletionSize(completionSize);
return this;
}
/**
- * Sets the batch timeout
+ * Sets the completion timeout, which would cause the aggregate to consider the group as complete
+ * and send out the aggregated exchange.
*
- * @param batchTimeout the timeout in millis
+ * @param completionTimeout the timeout in millis
* @return the builder
*/
- public AggregateDefinition batchTimeout(long batchTimeout) {
- setBatchTimeout(batchTimeout);
- return this;
- }
-
- /**
- * Sets the aggregate collection to use
- *
- * @param aggregationCollection the aggregate collection to use
- * @return the builder
- */
- public AggregateDefinition aggregationCollection(AggregationCollection aggregationCollection) {
- setAggregationCollection(aggregationCollection);
+ public AggregateDefinition completionTimeout(long completionTimeout) {
+ setCompletionTimeout(completionTimeout);
return this;
}
@@ -363,24 +267,13 @@
}
/**
- * Sets the aggregate collection to use
- *
- * @param collectionRef reference to the aggregate collection to lookup in the registry
- * @return the builder
- */
- public AggregateDefinition collectionRef(String collectionRef) {
- setCollectionRef(collectionRef);
- return this;
- }
-
- /**
* Sets the aggregate strategy to use
*
- * @param strategyRef reference to the strategy to lookup in the registry
+ * @param aggregationStrategyRef reference to the strategy to lookup in the registry
* @return the builder
*/
- public AggregateDefinition strategyRef(String strategyRef) {
- setStrategyRef(strategyRef);
+ public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+ setAggregationStrategyRef(aggregationStrategyRef);
return this;
}
@@ -412,6 +305,7 @@
* Sets the predicate used to determine if the aggregation is completed
*
* @param predicate the predicate
+ * @return the builder
*/
public AggregateDefinition completionPredicate(Predicate predicate) {
checkNoCompletedPredicate();
@@ -436,7 +330,7 @@
// Section - Methods from ExpressionNode
// Needed to copy methods from ExpressionNode here so that I could specify the
// correlation expression as optional in JAXB
-
+
public ExpressionDefinition getExpression() {
if (expression == null && correlationExpression != null) {
expression = correlationExpression.getExpressionType();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Wed Feb 17 08:27:25 2010
@@ -44,14 +44,12 @@
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.builder.ProcessorBuilder;
-import org.apache.camel.language.simple.SimpleLanguage;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.model.language.LanguageExpression;
import org.apache.camel.processor.DefaultChannel;
import org.apache.camel.processor.InterceptEndpointProcessor;
import org.apache.camel.processor.Pipeline;
-import org.apache.camel.processor.aggregate.AggregationCollection;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.interceptor.Delayer;
import org.apache.camel.processor.interceptor.HandleFault;
@@ -1379,20 +1377,6 @@
* <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
* Creates an aggregator allowing you to combine a number of messages together into a single message.
*
- * @param aggregationCollection the collection used to perform the aggregation
- * @return the builder
- */
- public AggregateDefinition aggregate(AggregationCollection aggregationCollection) {
- AggregateDefinition answer = new AggregateDefinition();
- answer.setAggregationCollection(aggregationCollection);
- addOutput(answer);
- return answer;
- }
-
- /**
- * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
- * Creates an aggregator allowing you to combine a number of messages together into a single message.
- *
* @param correlationExpression the expression used to calculate the
* correlation key. For a JMS message this could be the
* expression <code>header("JMSDestination")</code> or
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Wed Feb 17 08:27:25 2010
@@ -46,11 +46,11 @@
* A base class for any kind of {@link Processor} which implements some kind of batch processing.
*
* @version $Revision$
+ * @deprecated will be removed in Camel 2.4
*/
+@Deprecated
public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
- // TODO: Should aggregate on the fly as well
-
public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
public static final int DEFAULT_BATCH_SIZE = 100;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Wed Feb 17 08:27:25 2010
@@ -35,6 +35,8 @@
*/
public class Resequencer extends BatchProcessor implements Traceable {
+ // TODO: Rework to avoid using BatchProcessor
+
public Resequencer(Processor processor, Expression expression) {
this(processor, createSet(expression));
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Feb 17 08:27:25 2010
@@ -31,6 +31,7 @@
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.processor.Traceable;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.DefaultTimeoutMap;
import org.apache.camel.util.ExchangeHelper;
@@ -43,11 +44,25 @@
import org.apache.commons.logging.LogFactory;
/**
- * <a href="http://camel.apache.org/aggregator.html">Aggregator</a> EIP pattern.
+ * An implementation of the <a
+ * href="http://camel.apache.org/aggregator2.html">Aggregator</a>
+ * pattern where a batch of messages are processed (up to a maximum amount or
+ * until some timeout is reached) and messages for the same correlation key are
+ * combined together using some kind of {@link AggregationStrategy}
+ * (by default the latest message is used) to compress many message exchanges
+ * into a smaller number of exchanges.
+ * <p/>
+ * A good example of this is stock market data; you may be receiving 30,000
+ * messages/second and you may want to throttle it right down so that multiple
+ * messages for the same stock are combined (or just the latest message is used
+ * and older prices are discarded). Another idea is to combine line item messages
+ * together into a single invoice message.
*
* @version $Revision$
*/
-public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
+public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
+
+ // TODO: Add support for parallelProcessing, setting custom ExecutorService like multicast
private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
@@ -63,14 +78,15 @@
// options
private boolean ignoreBadCorrelationKeys;
private boolean closeCorrelationKeyOnCompletion;
- private boolean useBatchSizeFromConsumer;
private int concurrentConsumers = 1;
// different ways to have completion triggered
private boolean eagerCheckCompletion;
private Predicate completionPredicate;
private long completionTimeout;
- private int completionAggregatedSize;
+ private int completionSize;
+ private boolean completionFromBatchConsumer;
+ private int batchConsumerCounter;
public AggregateProcessor(Processor processor, Expression correlationExpression, AggregationStrategy aggregationStrategy) {
ObjectHelper.notNull(processor, "processor");
@@ -86,6 +102,10 @@
return "AggregateProcessor[to: " + processor + "]";
}
+ public String getTraceLabel() {
+ return "aggregate[" + correlationExpression + "]";
+ }
+
public List<Processor> next() {
if (!hasNext()) {
return null;
@@ -121,63 +141,68 @@
}
}
- // if batch consumer is enabled then we need to adjust the batch size
- // with the size from the batch consumer
- if (isUseBatchSizeFromConsumer()) {
- int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
- if (size > 0 && size != completionAggregatedSize) {
- completionAggregatedSize = size;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Using batch consumer completion, so setting completionAggregatedSize to: " + completionAggregatedSize);
- }
- }
+ doAggregation(key, exchange);
+ }
+
+ private synchronized Exchange doAggregation(Object key, Exchange exchange) {
+ // TODO: lock this based on keys so we can run in parallel groups
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("+++ start +++ onAggregation for key " + key);
}
+ Exchange answer;
Exchange oldExchange = aggregationRepository.get(key);
Exchange newExchange = exchange;
Integer size = 1;
if (oldExchange != null) {
- size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class);
- ObjectHelper.notNull(size, Exchange.AGGREGATED_SIZE + " on " + oldExchange);
+ size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class);
size++;
}
// check if we are complete
boolean complete = false;
if (isEagerCheckCompletion()) {
- complete = isCompleted(key, exchange, size);
+ // put the current aggregated size on the exchange so its avail during completion check
+ newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+ complete = isCompleted(key, newExchange);
+ // remove it afterwards
+ newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
}
// prepare the exchanges for aggregation and aggregate it
ExchangeHelper.prepareAggregation(oldExchange, newExchange);
- newExchange = onAggregation(oldExchange, newExchange);
- newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+ answer = onAggregation(oldExchange, exchange);
+ answer.setProperty(Exchange.AGGREGATED_SIZE, size);
// maybe we should check completion after the aggregation
if (!isEagerCheckCompletion()) {
- // use the new aggregated exchange when testing
- complete = isCompleted(key, newExchange, size);
+ // put the current aggregated size on the exchange so its avail during completion check
+ answer.setProperty(Exchange.AGGREGATED_SIZE, size);
+ complete = isCompleted(key, answer);
}
// only need to update aggregation repository if we are not complete
- if (!complete && !newExchange.equals(oldExchange)) {
+ if (!complete) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Put exchange:" + newExchange + " with correlation key:" + key);
+ LOG.trace("In progress aggregated exchange: " + answer + " with correlation key:" + key);
}
- aggregationRepository.add(key, newExchange);
+ aggregationRepository.add(key, answer);
}
if (complete) {
- onCompletion(key, newExchange);
+ onCompletion(key, answer, false);
}
- }
- protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
- return aggregationStrategy.aggregate(oldExchange, newExchange);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("+++ end +++ onAggregation for key " + key + " with size " + size);
+ }
+
+ return answer;
}
- protected boolean isCompleted(Object key, Exchange exchange, int size) {
+ protected boolean isCompleted(Object key, Exchange exchange) {
if (getCompletionPredicate() != null) {
boolean answer = getCompletionPredicate().matches(exchange);
if (answer) {
@@ -185,27 +210,44 @@
}
}
- if (getCompletionAggregatedSize() > 0) {
- if (size >= getCompletionAggregatedSize()) {
+ if (getCompletionSize() > 0) {
+ int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
+ if (size >= getCompletionSize()) {
return true;
}
}
if (getCompletionTimeout() > 0) {
// timeout is used so use the timeout map to keep an eye on this
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updating correlation key " + key + " to timeout after " + getCompletionTimeout() + " ms.");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Updating correlation key " + key + " to timeout after "
+ + getCompletionTimeout() + " ms. as exchange received: " + exchange);
}
timeoutMap.put(key, exchange, getCompletionTimeout());
}
+ if (isCompletionFromBatchConsumer()) {
+ batchConsumerCounter++;
+ int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
+ if (size > 0 && batchConsumerCounter >= size) {
+ // batch consumer is complete
+ batchConsumerCounter = 0;
+ return true;
+ }
+ }
+
return false;
}
- protected void onCompletion(Object key, final Exchange exchange) {
- // remove from repository and timeout map as its completed
+ protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
+ return aggregationStrategy.aggregate(oldExchange, newExchange);
+ }
+
+ protected void onCompletion(Object key, final Exchange exchange, boolean fromTimeout) {
+ // remove from repository as its completed
aggregationRepository.remove(key);
- if (timeoutMap != null) {
+ if (!fromTimeout && timeoutMap != null) {
+ // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
timeoutMap.remove(key);
}
@@ -270,12 +312,12 @@
this.completionTimeout = completionTimeout;
}
- public int getCompletionAggregatedSize() {
- return completionAggregatedSize;
+ public int getCompletionSize() {
+ return completionSize;
}
- public void setCompletionAggregatedSize(int completionAggregatedSize) {
- this.completionAggregatedSize = completionAggregatedSize;
+ public void setCompletionSize(int completionSize) {
+ this.completionSize = completionSize;
}
public boolean isIgnoreBadCorrelationKeys() {
@@ -294,12 +336,12 @@
this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
}
- public boolean isUseBatchSizeFromConsumer() {
- return useBatchSizeFromConsumer;
+ public boolean isCompletionFromBatchConsumer() {
+ return completionFromBatchConsumer;
}
- public void setUseBatchSizeFromConsumer(boolean useBatchSizeFromConsumer) {
- this.useBatchSizeFromConsumer = useBatchSizeFromConsumer;
+ public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) {
+ this.completionFromBatchConsumer = completionFromBatchConsumer;
}
public int getConcurrentConsumers() {
@@ -334,14 +376,14 @@
if (log.isDebugEnabled()) {
log.debug("Completion timeout triggered for correlation key: " + entry.getKey());
}
- onCompletion(entry.getKey(), entry.getValue());
+ onCompletion(entry.getKey(), entry.getValue(), true);
return true;
}
}
@Override
protected void doStart() throws Exception {
- if (getCompletionTimeout() <= 0 && getCompletionAggregatedSize() <= 0 && getCompletionPredicate() == null) {
+ if (getCompletionTimeout() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null) {
throw new IllegalStateException("At least one of the completions options"
+ " [completionTimeout, completionAggregatedSize, completionPredicate] must be set");
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Wed Feb 17 08:27:25 2010
@@ -31,6 +31,7 @@
*
* @version $Revision$
*/
+@Deprecated
public interface AggregationCollection extends Collection<Exchange> {
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java Wed Feb 17 08:27:25 2010
@@ -36,6 +36,7 @@
*
* @version $Revision$
*/
+@Deprecated
public class DefaultAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection {
private static final transient Log LOG = LogFactory.getLog(DefaultAggregationCollection.class);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java Wed Feb 17 08:27:25 2010
@@ -30,6 +30,7 @@
*
* @version $Revision$
*/
+@Deprecated
public class PredicateAggregationCollection extends DefaultAggregationCollection {
private Predicate aggregationCompletedPredicate;
private List<Exchange> collection = new ArrayList<Exchange>();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Wed Feb 17 08:27:25 2010
@@ -16,14 +16,14 @@
*/
package org.apache.camel.util;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Service;
import org.apache.commons.logging.Log;
@@ -38,11 +38,11 @@
protected final transient Log log = LogFactory.getLog(getClass());
- private final Map<K, TimeoutMapEntry<K, V>> map = new HashMap<K, TimeoutMapEntry<K, V>>();
- private final SortedSet<TimeoutMapEntry<K, V>> index = new TreeSet<TimeoutMapEntry<K, V>>();
+ private final ConcurrentMap<K, TimeoutMapEntry<K, V>> map = new ConcurrentHashMap<K, TimeoutMapEntry<K, V>>();
private final ScheduledExecutorService executor;
private final long purgePollTime;
private final long initialDelay = 1000L;
+ private final Lock lock = new ReentrantLock();
public DefaultTimeoutMap() {
this(null, 1000L);
@@ -56,82 +56,91 @@
public V get(K key) {
TimeoutMapEntry<K, V> entry;
- synchronized (map) {
+ lock.lock();
+ try {
entry = map.get(key);
if (entry == null) {
return null;
}
- index.remove(entry);
updateExpireTime(entry);
- index.add(entry);
+ } finally {
+ lock.unlock();
}
return entry.getValue();
}
public void put(K key, V value, long timeoutMillis) {
TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis);
- synchronized (map) {
- TimeoutMapEntry<K, V> oldValue = map.put(key, entry);
- if (oldValue != null) {
- index.remove(oldValue);
- }
+ lock.lock();
+ try {
+ map.put(key, entry);
updateExpireTime(entry);
- index.add(entry);
+ } finally {
+ lock.unlock();
}
}
public void remove(K id) {
- synchronized (map) {
- TimeoutMapEntry entry = map.remove(id);
- if (entry != null) {
- index.remove(entry);
- }
+ lock.lock();
+ try {
+ map.remove(id);
+ } finally {
+ lock.unlock();
}
}
public Object[] getKeys() {
- Object[] keys = null;
- synchronized (map) {
+ Object[] keys;
+ lock.lock();
+ try {
Set<K> keySet = map.keySet();
keys = new Object[keySet.size()];
keySet.toArray(keys);
+ } finally {
+ lock.unlock();
}
return keys;
}
public int size() {
- synchronized (map) {
- return map.size();
- }
+ return map.size();
}
/**
* The timer task which purges old requests and schedules another poll
*/
public void run() {
- purge();
+ if (log.isTraceEnabled()) {
+ log.trace("Running purge task to see if any entries has been timed out");
+ }
+ try {
+ purge();
+ } catch (Throwable t) {
+ // must catch and log exception otherwise the executor will now schedule next run
+ log.error("Exception occurred during purge task", t);
+ }
}
public void purge() {
+ if (log.isTraceEnabled()) {
+ log.debug("There are " + map.size() + " in the timeout map");
+ }
long now = currentTime();
- synchronized (map) {
- for (Iterator<TimeoutMapEntry<K, V>> iter = index.iterator(); iter.hasNext();) {
- TimeoutMapEntry<K, V> entry = iter.next();
- if (entry == null) {
- break;
- }
- if (entry.getExpireTime() < now) {
- if (isValidForEviction(entry)) {
+
+ lock.lock();
+ try {
+ for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) {
+ if (entry.getValue().getExpireTime() < now) {
+ if (isValidForEviction(entry.getValue())) {
if (log.isDebugEnabled()) {
log.debug("Evicting inactive request for correlationID: " + entry);
}
- map.remove(entry.getKey());
- iter.remove();
+ map.remove(entry.getKey(), entry.getValue());
}
- } else {
- break;
}
}
+ } finally {
+ lock.unlock();
}
}
@@ -182,6 +191,5 @@
executor.shutdown();
}
map.clear();
- index.clear();
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java Wed Feb 17 08:27:25 2010
@@ -39,7 +39,9 @@
.setHeader("id", simple("${file:onlyname.noext}"))
.threads(20)
.beanRef("business")
- .aggregate(header("country"), new MyBusinessTotal()).batchSizeFromConsumer().batchTimeout(60000).to("mock:result");
+ .aggregate(header("country"), new MyBusinessTotal())
+ .completionTimeout(2000L)
+ .to("mock:result");
}
});
@@ -61,7 +63,9 @@
from("file://target/concurrent?delay=60000&initialDelay=2500")
.setHeader("id", simple("${file:onlyname.noext}"))
.beanRef("business")
- .aggregate(header("country"), new MyBusinessTotal()).batchSizeFromConsumer().batchTimeout(60000).to("mock:result");
+ .aggregate(header("country"), new MyBusinessTotal())
+ .completionTimeout(2000L)
+ .to("mock:result");
}
});
@@ -73,7 +77,7 @@
assertMockEndpointsSatisfied();
long delta = System.currentTimeMillis() - start;
- LOG.debug("Time taken sequentiel: " + delta);
+ LOG.debug("Time taken sequential: " + delta);
}
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java Wed Feb 17 08:27:25 2010
@@ -68,7 +68,10 @@
.setHeader("id", simple("${file:onlyname.noext}"))
.threads(20)
.beanRef("business")
- .aggregate(header("country"), new MyBusinessTotal()).batchSize(10).batchTimeout(60000).to("mock:result");
+ .log("Country is ${in.header.country}")
+ .aggregate(header("country"), new MyBusinessTotal())
+ .completionTimeout(2000L)
+ .to("mock:result");
}
});
@@ -90,7 +93,9 @@
from("file://target/concurrent?delay=60000&initialDelay=2500")
.setHeader("id", simple("${file:onlyname.noext}"))
.beanRef("business")
- .aggregate(header("country"), new MyBusinessTotal()).batchSize(10).batchTimeout(60000).to("mock:result");
+ .aggregate(header("country"), new MyBusinessTotal())
+ .completionTimeout(2000L)
+ .to("mock:result");
}
});
@@ -142,7 +147,7 @@
}
Integer add = newExchange.getIn().getBody(Integer.class);
int total = current.intValue() + add.intValue();
- LOG.debug("Aggregated sum so far: " + total + " for country: " + country);
+ LOG.info("Aggregated sum so far: " + total + " for country: " + country);
answer.getIn().setBody(total);
return answer;
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java Wed Feb 17 08:27:25 2010
@@ -16,9 +16,13 @@
*/
package org.apache.camel.processor;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* @version $Revision$
@@ -44,19 +48,15 @@
testSendALargeBatch("direct:predicate");
}
- public void testOutBatchPredicate() throws Exception {
- testSendALargeBatch("direct:outBatchPredicate");
- }
-
- public void testOutBatchWithNoInBatching() throws Exception {
- testSendALargeBatch("direct:outBatchNoInBatching");
- }
-
public void testOneMessage() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
- template.sendBodyAndHeader("direct:predicate", "test", "aggregated", 5);
+ Map headers = new HashMap();
+ headers.put("cheese", 123);
+ headers.put("aggregated", 5);
+
+ template.sendBodyAndHeaders("direct:predicate", "test", headers);
resultEndpoint.assertIsSatisfied();
}
@@ -84,25 +84,20 @@
// START SNIPPET: ex
// in this route we aggregate all from direct:state based on the header id cheese
- from("direct:start").aggregate(header("cheese")).to("mock:result");
-
- from("seda:header").setHeader("visited", constant(true)).aggregate(header("cheese")).to("mock:result");
+ from("direct:start")
+ .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(1000L)
+ .to("mock:result");
+
+ from("seda:header").setHeader("visited", constant(true))
+ .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(1000L)
+ .to("mock:result");
// in this sample we aggregate using our own strategy with a completion predicate
// stating that the aggregated header is equal to 5.
- from("direct:predicate").aggregate(header("cheese"), new MyAggregationStrategy()).
- completionPredicate(header("aggregated").isEqualTo(5)).to("mock:result");
-
- // this sample is similar to the one above but it also illustrates the use of outBatchSize
- // to send exchanges to mock:endpoint in batches of 10.
- from("direct:outBatchPredicate").aggregate(header("cheese"), new MyAggregationStrategy()).
- completionPredicate(header("aggregated").isEqualTo(5)).outBatchSize(10).to("mock:result");
+ from("direct:predicate")
+ .aggregate(header("cheese"), new MyAggregationStrategy()).completionPredicate(header("aggregated").isEqualTo(5))
+ .to("mock:result");
// END SNIPPET: ex
-
- // turning off in batching (batchSize = 1) is a good way to test "out" batching. Don't include
- // in wiki snippet as it may not be a good example to follow.
- from("direct:outBatchNoInBatching").aggregate(header("cheese"), new MyAggregationStrategy()).
- completionPredicate(header("aggregated").isEqualTo(5)).batchSize(1).outBatchSize(10).to("mock:result");
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java Wed Feb 17 08:27:25 2010
@@ -94,7 +94,9 @@
.to("seda:aggregate");
// collect and re-assemble the validated OrderItems into an order again
- from("seda:aggregate").aggregate(new MyOrderAggregationStrategy()).header("orderId").to("mock:result");
+ from("seda:aggregate")
+ .aggregate(new MyOrderAggregationStrategy()).header("orderId").completionTimeout(1000L)
+ .to("mock:result");
// END SNIPPET: e2
}
};
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java Wed Feb 17 08:27:25 2010
@@ -125,7 +125,7 @@
public void configure() throws Exception {
from("direct:start")
.aggregate(header("id"), new BodyInAggregatingStrategy())
- .completionPredicate(body().contains("END")).batchTimeout(20000)
+ .completionPredicate(body().contains("END")).completionTimeout(20000)
.to("mock:aggregated");
}
};
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java Wed Feb 17 08:27:25 2010
@@ -20,6 +20,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class AggregateFromWireTapTest extends ContextTestSupport {
@@ -50,10 +51,10 @@
from("direct:tap")
// just use a constant correlation expression as we want to agg everything
// in the same group. set batch size to two which means to fire when we
- // have received 2 incoming messages, if not the timeout of 5 sec will kick in
- .aggregate(new MyAggregationStrategy()).constant(true).batchSize(2)
- .batchTimeout(5000L)
- .to("direct:aggregated")
+ // have aggregated 2 messages, if not the timeout of 5 sec will kick in
+ .aggregate(constant(true), new MyAggregationStrategy())
+ .completionSize(2).completionTimeout(5000L)
+ .to("direct:aggregated")
.end();
from("direct:aggregated")
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java Wed Feb 17 08:27:25 2010
@@ -74,9 +74,9 @@
// our route is aggregating from the direct queue and sending the response to the mock
from("direct:start")
// aggregated all use same expression
- .aggregate().constant(true).batchSize(2)
+ .aggregate().constant(true).completionSize(2)
// wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
+ .completionTimeout(500L)
// group the exchanges so we get one single exchange containing all the others
.groupExchanges()
.to("mock:result");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java Wed Feb 17 08:27:25 2010
@@ -41,6 +41,9 @@
template.sendBodyAndHeader("direct:start", "150", "foo", "A");
template.sendBodyAndHeader("direct:start", "200", "foo", "B");
template.sendBodyAndHeader("direct:start", "180", "foo", "B");
+
+ // to force B to timeout first as A is added last
+ Thread.sleep(100);
template.sendBodyAndHeader("direct:start", "120", "foo", "A");
assertMockEndpointsSatisfied();
@@ -50,18 +53,19 @@
assertEquals(3, grouped.size());
- assertEquals("100", grouped.get(0).getIn().getBody(String.class));
- assertEquals("150", grouped.get(1).getIn().getBody(String.class));
- assertEquals("120", grouped.get(2).getIn().getBody(String.class));
+ // B timeout first
+ assertEquals("130", grouped.get(0).getIn().getBody(String.class));
+ assertEquals("200", grouped.get(1).getIn().getBody(String.class));
+ assertEquals("180", grouped.get(2).getIn().getBody(String.class));
out = result.getExchanges().get(1);
grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
assertEquals(3, grouped.size());
- assertEquals("130", grouped.get(0).getIn().getBody(String.class));
- assertEquals("200", grouped.get(1).getIn().getBody(String.class));
- assertEquals("180", grouped.get(2).getIn().getBody(String.class));
+ assertEquals("100", grouped.get(0).getIn().getBody(String.class));
+ assertEquals("150", grouped.get(1).getIn().getBody(String.class));
+ assertEquals("120", grouped.get(2).getIn().getBody(String.class));
// END SNIPPET: e2
}
@@ -74,10 +78,10 @@
from("direct:start")
// aggregate all using the foo header
.aggregate().header("foo")
- // wait for 1 seconds to aggregate
- .batchTimeout(1000L)
// group the exchanges so we get one single exchange containing all the others
.groupExchanges()
+ // wait for 1 seconds to aggregate
+ .completionTimeout(1000L)
.to("mock:result");
// END SNIPPET: e1
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java Wed Feb 17 08:27:25 2010
@@ -67,7 +67,7 @@
// aggregate all using same expression
.aggregate().constant(true)
// wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
+ .completionTimeout(500L)
// group the exchanges so we get one single exchange containing all the others
.groupExchanges()
.to("mock:result");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java Wed Feb 17 08:27:25 2010
@@ -68,7 +68,7 @@
oldExchange.getIn().setBody(oldBody + "," + newBody);
return oldExchange;
}
- }).batchSize(10).batchTimeout(2000L)
+ }).completionSize(10).completionTimeout(2000L)
.to("log:aggregated")
.to("mock:result");
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java Wed Feb 17 08:27:25 2010
@@ -56,7 +56,7 @@
from("seda:baz").to("direct:aggregate");
from("direct:aggregate")
- .aggregate(header("type"), new MyAggregationStrategy()).batchSize(25).batchTimeout(5000)
+ .aggregate(header("type"), new MyAggregationStrategy()).completionSize(25).completionTimeout(5000)
.to("mock:result")
.end();
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Wed Feb 17 08:27:25 2010
@@ -139,7 +139,7 @@
AggregationStrategy as = new BodyInAggregatingStrategy();
AggregateProcessor ap = new AggregateProcessor(done, corr, as);
- ap.setCompletionAggregatedSize(3);
+ ap.setCompletionSize(3);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -365,35 +365,45 @@
AggregationStrategy as = new BodyInAggregatingStrategy();
AggregateProcessor ap = new AggregateProcessor(done, corr, as);
- ap.setCompletionAggregatedSize(100);
- ap.setUseBatchSizeFromConsumer(true);
+ ap.setCompletionSize(100);
+ ap.setCompletionFromBatchConsumer(true);
ap.start();
Exchange e1 = new DefaultExchange(context);
e1.getIn().setBody("A");
e1.getIn().setHeader("id", 123);
+ e1.setProperty(Exchange.BATCH_INDEX, 0);
e1.setProperty(Exchange.BATCH_SIZE, 2);
+ e1.setProperty(Exchange.BATCH_COMPLETE, false);
Exchange e2 = new DefaultExchange(context);
e2.getIn().setBody("B");
e2.getIn().setHeader("id", 123);
+ e2.setProperty(Exchange.BATCH_INDEX, 1);
e2.setProperty(Exchange.BATCH_SIZE, 2);
+ e2.setProperty(Exchange.BATCH_COMPLETE, true);
Exchange e3 = new DefaultExchange(context);
e3.getIn().setBody("C");
e3.getIn().setHeader("id", 123);
+ e3.setProperty(Exchange.BATCH_INDEX, 0);
e3.setProperty(Exchange.BATCH_SIZE, 3);
+ e3.setProperty(Exchange.BATCH_COMPLETE, false);
Exchange e4 = new DefaultExchange(context);
e4.getIn().setBody("D");
e4.getIn().setHeader("id", 123);
+ e4.setProperty(Exchange.BATCH_INDEX, 1);
e4.setProperty(Exchange.BATCH_SIZE, 3);
+ e4.setProperty(Exchange.BATCH_COMPLETE, false);
Exchange e5 = new DefaultExchange(context);
e5.getIn().setBody("E");
e5.getIn().setHeader("id", 123);
+ e5.setProperty(Exchange.BATCH_INDEX, 2);
e5.setProperty(Exchange.BATCH_SIZE, 3);
+ e5.setProperty(Exchange.BATCH_COMPLETE, true);
ap.process(e1);
ap.process(e2);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java Wed Feb 17 08:27:25 2010
@@ -19,6 +19,7 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* Unit test to verify that aggregate by timeout only also works.
@@ -29,7 +30,7 @@
public void testAggregateTimeoutOnly() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
- // by default the use latest aggregatation strategy is used so we get message 9
+ // by default the use latest aggregation strategy is used so we get message 9
result.expectedBodiesReceived("Message 9");
// should take 3 seconds to complete this one
result.setMinimumResultWaitTime(2500);
@@ -49,7 +50,7 @@
// START SNIPPET: e1
from("direct:start")
// aggregate every 3th second and disable the batch size so we set it to 0
- .aggregate(header("id")).batchTimeout(3000).batchSize(0)
+ .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000).completionSize(0)
.to("mock:result");
// END SNIPPET: e1
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java Wed Feb 17 08:27:25 2010
@@ -22,7 +22,6 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
/**
* Unit test inspired by user forum.
@@ -49,14 +48,12 @@
onException(CamelException.class).maximumRedeliveries(2);
from("seda:start")
- .aggregate(new PredicateAggregationCollection(header("id"),
+ .aggregate(header("id"),
new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
return newExchange;
}
- },
- property(Exchange.AGGREGATED_SIZE).isEqualTo(2)))
- .batchTimeout(500L)
+ }).completionSize(2).completionTimeout(500L)
.to("mock:result");
}
};
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java Wed Feb 17 08:27:25 2010
@@ -18,6 +18,7 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* Unit test with timer, splitter, aggregator and tracer.
@@ -35,7 +36,8 @@
public void configure() throws Exception {
from("direct:start").
- aggregate(header("id")).
+ aggregate(header("id"), new UseLatestAggregationStrategy()).
+ completionTimeout(2000L).
bean(AggregatorBeanThrowExceptionTest.class, "fooDoesNotExistMethod").
to("log:foo");
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java Wed Feb 17 08:27:25 2010
@@ -90,7 +90,7 @@
LOG.debug("Index: " + newIndex + ". Total so far: " + total);
return answer;
}
- }).batchTimeout(60000).completionPredicate(property(Exchange.AGGREGATED_SIZE).isEqualTo(100))
+ }).completionTimeout(60000).completionPredicate(property(Exchange.AGGREGATED_SIZE).isEqualTo(100))
.to("direct:foo");
from("direct:foo").setBody().header("total").to("mock:result");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java Wed Feb 17 08:27:25 2010
@@ -21,6 +21,7 @@
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* Based on CAMEL-1546
@@ -63,7 +64,7 @@
onException(IllegalArgumentException.class).handled(true).to("mock:handled");
from("direct:start")
- .aggregate(header("id"))
+ .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(1000L)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java Wed Feb 17 08:27:25 2010
@@ -22,6 +22,7 @@
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* Based on CAMEL-1546
@@ -65,7 +66,7 @@
from("direct:start")
.aggregate(header("id"))
- .batchTimeout(500)
+ .completionTimeout(500)
.aggregationStrategy(new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
@@ -87,8 +88,8 @@
}
return ExpressionBuilder.headerExpression("id").evaluate(exchange, type);
}
- })
- .batchTimeout(500)
+ }, new UseLatestAggregationStrategy())
+ .completionTimeout(500)
.to("mock:result");
}
};
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java Wed Feb 17 08:27:25 2010
@@ -21,6 +21,7 @@
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
public class AggregatorExceptionTest extends ContextTestSupport {
@@ -45,8 +46,8 @@
errorHandler(deadLetterChannel("mock:error"));
from("direct:start")
- .aggregate(header("id"))
- .batchSize(5)
+ .aggregate(header("id"), new UseLatestAggregationStrategy())
+ .completionSize(5)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
throw new java.lang.NoSuchMethodError(exceptionString);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java Wed Feb 17 08:27:25 2010
@@ -19,6 +19,7 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* Unit test with timer, splitter, aggregator and tracer.
@@ -39,7 +40,7 @@
getContext().setTracing(true);
from("seda:splitted").
- aggregate(header("id")).
+ aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(1000L).
to("mock:foo").
to("mock:result");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java Wed Feb 17 08:27:25 2010
@@ -158,7 +158,7 @@
from("direct:joinSurnames")
.aggregate(header(SURNAME_HEADER),
- surnameAggregator).setHeader(TYPE_HEADER,
+ surnameAggregator).completionTimeout(2000L).setHeader(TYPE_HEADER,
constant(BROTHERS_TYPE)).to("direct:joinBrothers");
// Join all brothers lists and remove surname and type headers
@@ -166,7 +166,7 @@
from("direct:joinBrothers").aggregate(header(TYPE_HEADER),
brothersAggregator);
- agg.setBatchTimeout(2000L);
+ agg.setCompletionTimeout(2000L);
agg.removeHeader(SURNAME_HEADER)
.removeHeader(TYPE_HEADER)
.to("mock:result");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java Wed Feb 17 08:27:25 2010
@@ -32,8 +32,7 @@
MockEndpoint result = getMockEndpoint("mock:result");
// we expect to find the two winners with the highest bid
- result.expectedMessageCount(2);
- result.expectedBodiesReceived("200", "150");
+ result.expectedBodiesReceivedInAnyOrder("200", "150");
// then we sent all the message at once
template.sendBodyAndHeader("direct:start", "100", "id", "1");
@@ -55,8 +54,8 @@
from("direct:start")
// aggregated by header id and use our own strategy how to aggregate
.aggregate(new MyAggregationStrategy()).header("id")
- // wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
+ // wait for 1 seconds to aggregate
+ .completionTimeout(1000L)
.to("mock:result");
// END SNIPPET: e1
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java Wed Feb 17 08:27:25 2010
@@ -17,9 +17,9 @@
package org.apache.camel.processor.aggregator;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* Unit test for DefaultAggregatorCollection.
@@ -32,11 +32,7 @@
// we expect 4 messages grouped by the latest message only
result.expectedMessageCount(4);
- result.expectedBodiesReceived("Message 1d", "Message 2b", "Message 3c", "Message 4");
- result.message(0).property(Exchange.AGGREGATED_SIZE).isEqualTo(4);
- result.message(1).property(Exchange.AGGREGATED_SIZE).isEqualTo(2);
- result.message(2).property(Exchange.AGGREGATED_SIZE).isEqualTo(3);
- result.message(3).property(Exchange.AGGREGATED_SIZE).isEqualTo(1);
+ result.expectedBodiesReceivedInAnyOrder("Message 1d", "Message 2b", "Message 3c", "Message 4");
// then we sent all the message at once
template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
@@ -64,9 +60,9 @@
// aggregated by header id
// as we have not configured more on the aggregator it will default to aggregate the
// latest exchange only
- .aggregate().header("id")
+ .aggregate().header("id").aggregationStrategy(new UseLatestAggregationStrategy())
// wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
+ .completionTimeout(500L)
.to("mock:result");
// END SNIPPET: e1
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java Wed Feb 17 08:27:25 2010
@@ -18,11 +18,8 @@
package org.apache.camel.processor.aggregator;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AggregationCollection;
-import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
@@ -60,20 +57,11 @@
return new RouteBuilder() {
public void configure() throws Exception {
// START SNIPPET: e1
- // create the aggregation collection we will use.
- // - we will correlate the received message based on the id header
- // - as we will just keep the latest message we use the latest strategy
- // - and finally we stop aggregate if we receive 2 or more messages
- AggregationCollection ag = new PredicateAggregationCollection(header("id"),
- new UseLatestAggregationStrategy(),
- property(Exchange.AGGREGATED_SIZE).isEqualTo(3));
-
// our route is aggregating from the direct queue and sending the response to the mock
from("direct:start")
// we use the collection based aggregator we already have configured
- .aggregate(ag)
- // wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
+ .aggregate(header("id"), new UseLatestAggregationStrategy())
+ .completionSize(3)
.to("mock:result");
// END SNIPPET: e1
}