You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/11/05 16:21:29 UTC

svn commit: r592042 - in /activemq/camel/trunk/camel-core/src/main: java/org/apache/camel/model/ java/org/apache/camel/processor/ java/org/apache/camel/processor/aggregate/ resources/org/apache/camel/model/

Author: jstrachan
Date: Mon Nov  5 07:21:27 2007
New Revision: 592042

URL: http://svn.apache.org/viewvc?rev=592042&view=rev
Log:
added a spike to implement http://issues.apache.org/activemq/browse/CAMEL-207 to allow predicates to be used to determine if the end of the aggregation has been reached

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
    activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java Mon Nov  5 07:21:27 2007
@@ -21,12 +21,14 @@
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.impl.RouteContext;
@@ -49,6 +51,8 @@
     private Long batchTimeout;
     @XmlAttribute(required = false)
     private String strategyRef;
+    @XmlElement(name="completedPredicate", required = false)
+    private CompletedPredicate completedPredicate;
 
     public AggregatorType() {
     }
@@ -75,6 +79,7 @@
     public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
         Endpoint from = routeContext.getEndpoint();
         final Processor processor = routeContext.createProcessor(this);
+
         AggregationStrategy strategy = getAggregationStrategy();
         if (strategy == null && strategyRef != null) {
             strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
@@ -82,8 +87,19 @@
         if (strategy == null) {
             strategy = new UseLatestAggregationStrategy();
         }
-        final Aggregator service = new Aggregator(from, processor, getExpression()
-                .createExpression(routeContext), strategy);
+        Expression aggregateExpression = getExpression().createExpression(routeContext);
+
+        Predicate predicate = null;
+        if (completedPredicate != null) {
+            predicate = completedPredicate.createPredicate(routeContext);
+        }
+        final Aggregator service;
+        if (predicate != null) {
+            service = new Aggregator(from, processor, aggregateExpression, strategy, predicate);
+        }
+        else {
+            service = new Aggregator(from, processor, aggregateExpression, strategy);
+        }
 
         if (batchSize != null) {
             service.setBatchSize(batchSize);
@@ -134,6 +150,14 @@
         this.strategyRef = strategyRef;
     }
 
+    public CompletedPredicate getCompletePredicate() {
+        return completedPredicate;
+    }
+
+    public void setCompletePredicate(CompletedPredicate completedPredicate) {
+        this.completedPredicate = completedPredicate;
+    }
+
     // Fluent API
     //-------------------------------------------------------------------------
     public AggregatorType batchSize(int batchSize) {
@@ -145,4 +169,5 @@
         setBatchTimeout(batchTimeout);
         return this;
     }
+    
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java?rev=592042&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java Mon Nov  5 07:21:27 2007
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.Predicate;
+import org.apache.camel.impl.RouteContext;
+import org.apache.camel.model.language.ExpressionType;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+@XmlRootElement(name = "completedPredicate")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class CompletedPredicate {
+    @XmlElementRef
+    private ExpressionType completePredicate;
+    @XmlTransient
+    private Predicate predicate;
+
+    public CompletedPredicate() {
+    }
+
+    public CompletedPredicate(Predicate predicate) {
+        this.predicate = predicate;
+    }
+
+    public ExpressionType getCompletePredicate() {
+        return completePredicate;
+    }
+
+    public void setCompletePredicate(ExpressionType completePredicate) {
+        this.completePredicate = completePredicate;
+    }
+
+    public Predicate getPredicate() {
+        return predicate;
+    }
+
+    public void setPredicate(Predicate predicate) {
+        this.predicate = predicate;
+    }
+
+    public Predicate createPredicate(RouteContext routeContext) {
+        ExpressionType predicateType = getCompletePredicate();
+        if (predicateType != null && predicate == null) {
+            predicate = predicateType.createPredicate(routeContext);
+        }
+        return predicate;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/CompletedPredicate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java Mon Nov  5 07:21:27 2007
@@ -18,9 +18,11 @@
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Expression;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.aggregate.AggregationCollection;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
 
 /**
  * An implementation of the <a
@@ -43,11 +45,19 @@
  *                <code>header("JMSCorrelationID")</code>
  */
 public class Aggregator extends BatchProcessor {
+    private Predicate aggregationCompletedPredicate;
+
     public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
                       AggregationStrategy aggregationStrategy) {
         this(endpoint, processor, new AggregationCollection(correlationExpression, aggregationStrategy));
     }
 
+    public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
+                      AggregationStrategy aggregationStrategy, Predicate aggregationCompletedPredicate) {
+        this(endpoint, processor, new PredicateAggregationCollection(correlationExpression, aggregationStrategy, aggregationCompletedPredicate));
+        this.aggregationCompletedPredicate = aggregationCompletedPredicate;
+    }
+
     public Aggregator(Endpoint endpoint, Processor processor, AggregationCollection collection) {
         super(endpoint, processor, collection);
     }
@@ -55,5 +65,15 @@
     @Override
     public String toString() {
         return "Aggregator[to: " + getProcessor() + "]";
+    }
+
+    @Override
+    protected boolean isBatchCompleted(int index) {
+        if (aggregationCompletedPredicate != null) {
+            if (getCollection().size() > 0) {
+                return true;
+            };
+        }
+        return super.isBatchCompleted(index);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Mon Nov  5 07:21:27 2007
@@ -116,7 +116,7 @@
     protected synchronized void processBatch() throws Exception {
         long start = System.currentTimeMillis();
         long end = start + batchTimeout;
-        for (int i = 0; i < batchSize; i++) {
+        for (int i = 0; isBatchCompleted(i); i++) {
             long timeout = end - System.currentTimeMillis();
 
             Exchange exchange = consumer.receive(timeout);
@@ -138,6 +138,13 @@
             iter.remove();
             processExchange(exchange);
         }
+    }
+
+    /**
+     * A strategy method to decide if the batch is completed the resulting exchanges should be sent
+     */
+    protected boolean isBatchCompleted(int index) {
+        return index < batchSize;
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Mon Nov  5 07:21:27 2007
@@ -17,6 +17,7 @@
 package org.apache.camel.processor.aggregate;
 
 import java.util.AbstractCollection;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -54,6 +55,7 @@
         // the strategy may just update the old exchange and return it
         if (newExchange != oldExchange) {
             map.put(correlationKey, newExchange);
+            onAggregation(correlationKey, newExchange);
         }
         return true;
     }
@@ -64,5 +66,17 @@
 
     public int size() {
         return map.size();
+    }
+
+    @Override
+    public void clear() {
+        map.clear();
+    }
+
+    /**
+     * A strategy method allowing derived classes such as {@link PredicateAggregationCollection}
+     * to check to see if the aggregation has completed
+     */
+    protected void onAggregation(Object correlationKey, Exchange newExchange) {
     }
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java?rev=592042&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java Mon Nov  5 07:21:27 2007
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Predicate;
+
+/**
+ * An aggregator collection which uses a predicate to decide when an aggregation is completed for
+ * a particular correlation key
+ *
+ * @version $Revision: 1.1 $
+ */
+public class PredicateAggregationCollection extends AggregationCollection {
+    private Predicate aggregationCompletedPredicate;
+    private List<Exchange> collection = new ArrayList<Exchange>();
+
+    public PredicateAggregationCollection(Expression<Exchange> correlationExpression, AggregationStrategy aggregationStrategy, Predicate aggregationCompletedPredicate) {
+        super(correlationExpression, aggregationStrategy);
+        this.aggregationCompletedPredicate = aggregationCompletedPredicate;
+    }
+
+    @Override
+    protected void onAggregation(Object correlationKey, Exchange newExchange) {
+        if (aggregationCompletedPredicate.matches(newExchange)) {
+            // this exchange has now aggregated so lets add it to the collection of things
+            // to send
+            collection.add(newExchange);
+        }
+    }
+
+    @Override
+    public Iterator<Exchange> iterator() {
+        return collection.iterator();
+    }
+
+    @Override
+    public int size() {
+        return collection.size();
+    }
+
+    @Override
+    public void clear() {
+        collection.clear();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=592042&r1=592041&r2=592042&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Mon Nov  5 07:21:27 2007
@@ -18,6 +18,7 @@
 BeanRef
 CatchType
 ChoiceType
+CompletedPredicate
 DelayerType
 ExceptionType
 FilterType