You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/11/05 16:21:29 UTC
svn commit: r592042 - in /activemq/camel/trunk/camel-core/src/main:
java/org/apache/camel/model/ java/org/apache/camel/processor/
java/org/apache/camel/processor/aggregate/ resources/org/apache/camel/model/
Author: jstrachan
Date: Mon Nov 5 07:21:27 2007
New Revision: 592042
URL: http://svn.apache.org/viewvc?rev=592042&view=rev
Log:
added a spike to implement http://issues.apache.org/activemq/browse/CAMEL-207 to allow predicates to be used to determine if the end of the aggregation has been reached
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java Mon Nov 5 07:21:27 2007
@@ -21,12 +21,14 @@
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.impl.RouteContext;
@@ -49,6 +51,8 @@
private Long batchTimeout;
@XmlAttribute(required = false)
private String strategyRef;
+ @XmlElement(name="completedPredicate", required = false)
+ private CompletedPredicate completedPredicate;
public AggregatorType() {
}
@@ -75,6 +79,7 @@
public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
Endpoint from = routeContext.getEndpoint();
final Processor processor = routeContext.createProcessor(this);
+
AggregationStrategy strategy = getAggregationStrategy();
if (strategy == null && strategyRef != null) {
strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
@@ -82,8 +87,19 @@
if (strategy == null) {
strategy = new UseLatestAggregationStrategy();
}
- final Aggregator service = new Aggregator(from, processor, getExpression()
- .createExpression(routeContext), strategy);
+ Expression aggregateExpression = getExpression().createExpression(routeContext);
+
+ Predicate predicate = null;
+ if (completedPredicate != null) {
+ predicate = completedPredicate.createPredicate(routeContext);
+ }
+ final Aggregator service;
+ if (predicate != null) {
+ service = new Aggregator(from, processor, aggregateExpression, strategy, predicate);
+ }
+ else {
+ service = new Aggregator(from, processor, aggregateExpression, strategy);
+ }
if (batchSize != null) {
service.setBatchSize(batchSize);
@@ -134,6 +150,14 @@
this.strategyRef = strategyRef;
}
+ public CompletedPredicate getCompletePredicate() {
+ return completedPredicate;
+ }
+
+ public void setCompletePredicate(CompletedPredicate completedPredicate) {
+ this.completedPredicate = completedPredicate;
+ }
+
// Fluent API
//-------------------------------------------------------------------------
public AggregatorType batchSize(int batchSize) {
@@ -145,4 +169,5 @@
setBatchTimeout(batchTimeout);
return this;
}
+
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java?rev=592042&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java Mon Nov 5 07:21:27 2007
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.Predicate;
+import org.apache.camel.impl.RouteContext;
+import org.apache.camel.model.language.ExpressionType;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+@XmlRootElement(name = "completedPredicate")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class CompletedPredicate {
+ @XmlElementRef
+ private ExpressionType completePredicate;
+ @XmlTransient
+ private Predicate predicate;
+
+ public CompletedPredicate() {
+ }
+
+ public CompletedPredicate(Predicate predicate) {
+ this.predicate = predicate;
+ }
+
+ public ExpressionType getCompletePredicate() {
+ return completePredicate;
+ }
+
+ public void setCompletePredicate(ExpressionType completePredicate) {
+ this.completePredicate = completePredicate;
+ }
+
+ public Predicate getPredicate() {
+ return predicate;
+ }
+
+ public void setPredicate(Predicate predicate) {
+ this.predicate = predicate;
+ }
+
+ public Predicate createPredicate(RouteContext routeContext) {
+ ExpressionType predicateType = getCompletePredicate();
+ if (predicateType != null && predicate == null) {
+ predicate = predicateType.createPredicate(routeContext);
+ }
+ return predicate;
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java Mon Nov 5 07:21:27 2007
@@ -18,9 +18,11 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Expression;
+import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.processor.aggregate.AggregationCollection;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
/**
* An implementation of the <a
@@ -43,11 +45,19 @@
* <code>header("JMSCorrelationID")</code>
*/
public class Aggregator extends BatchProcessor {
+ private Predicate aggregationCompletedPredicate;
+
public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
AggregationStrategy aggregationStrategy) {
this(endpoint, processor, new AggregationCollection(correlationExpression, aggregationStrategy));
}
+ public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
+ AggregationStrategy aggregationStrategy, Predicate aggregationCompletedPredicate) {
+ this(endpoint, processor, new PredicateAggregationCollection(correlationExpression, aggregationStrategy, aggregationCompletedPredicate));
+ this.aggregationCompletedPredicate = aggregationCompletedPredicate;
+ }
+
public Aggregator(Endpoint endpoint, Processor processor, AggregationCollection collection) {
super(endpoint, processor, collection);
}
@@ -55,5 +65,15 @@
@Override
public String toString() {
return "Aggregator[to: " + getProcessor() + "]";
+ }
+
+ @Override
+ protected boolean isBatchCompleted(int index) {
+ if (aggregationCompletedPredicate != null) {
+ if (getCollection().size() > 0) {
+ return true;
+ };
+ }
+ return super.isBatchCompleted(index);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Mon Nov 5 07:21:27 2007
@@ -116,7 +116,7 @@
protected synchronized void processBatch() throws Exception {
long start = System.currentTimeMillis();
long end = start + batchTimeout;
- for (int i = 0; i < batchSize; i++) {
+ for (int i = 0; isBatchCompleted(i); i++) {
long timeout = end - System.currentTimeMillis();
Exchange exchange = consumer.receive(timeout);
@@ -138,6 +138,13 @@
iter.remove();
processExchange(exchange);
}
+ }
+
+ /**
+ * A strategy method to decide if the batch is completed the resulting exchanges should be sent
+ */
+ protected boolean isBatchCompleted(int index) {
+ return index < batchSize;
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Mon Nov 5 07:21:27 2007
@@ -17,6 +17,7 @@
package org.apache.camel.processor.aggregate;
import java.util.AbstractCollection;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -54,6 +55,7 @@
// the strategy may just update the old exchange and return it
if (newExchange != oldExchange) {
map.put(correlationKey, newExchange);
+ onAggregation(correlationKey, newExchange);
}
return true;
}
@@ -64,5 +66,17 @@
public int size() {
return map.size();
+ }
+
+ @Override
+ public void clear() {
+ map.clear();
+ }
+
+ /**
+ * A strategy method allowing derived classes such as {@link PredicateAggregationCollection}
+ * to check to see if the aggregation has completed
+ */
+ protected void onAggregation(Object correlationKey, Exchange newExchange) {
}
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java?rev=592042&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java Mon Nov 5 07:21:27 2007
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Predicate;
+
+/**
+ * An aggregator collection which uses a predicate to decide when an aggregation is completed for
+ * a particular correlation key
+ *
+ * @version $Revision: 1.1 $
+ */
+public class PredicateAggregationCollection extends AggregationCollection {
+ private Predicate aggregationCompletedPredicate;
+ private List<Exchange> collection = new ArrayList<Exchange>();
+
+ public PredicateAggregationCollection(Expression<Exchange> correlationExpression, AggregationStrategy aggregationStrategy, Predicate aggregationCompletedPredicate) {
+ super(correlationExpression, aggregationStrategy);
+ this.aggregationCompletedPredicate = aggregationCompletedPredicate;
+ }
+
+ @Override
+ protected void onAggregation(Object correlationKey, Exchange newExchange) {
+ if (aggregationCompletedPredicate.matches(newExchange)) {
+ // this exchange has now aggregated so lets add it to the collection of things
+ // to send
+ collection.add(newExchange);
+ }
+ }
+
+ @Override
+ public Iterator<Exchange> iterator() {
+ return collection.iterator();
+ }
+
+ @Override
+ public int size() {
+ return collection.size();
+ }
+
+ @Override
+ public void clear() {
+ collection.clear();
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Mon Nov 5 07:21:27 2007
@@ -18,6 +18,7 @@
BeanRef
CatchType
ChoiceType
+CompletedPredicate
DelayerType
ExceptionType
FilterType