You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/14 10:59:12 UTC

svn commit: r909995 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/aggregator/

Author: davsclaus
Date: Sun Feb 14 09:59:12 2010
New Revision: 909995

URL: http://svn.apache.org/viewvc?rev=909995&view=rev
Log:
CAMEL-1686: Aggregator now lets completion predicate being evaluated on the fly which allows the predicate to trigger before the batch timeout.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=909995&r1=909994&r2=909995&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sun Feb 14 09:59:12 2010
@@ -902,6 +902,7 @@
             return;
         }
 
+        // super will invoke doStart which will prepare internal services before we continue and start the routes below
         super.start();
 
         LOG.debug("Starting routes...");
@@ -972,6 +973,7 @@
             for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
                 Integer order = entry.getKey();
                 Route route = entry.getValue().getRoute();
+
                 RouteService routeService = entry.getValue().getRouteService();
                 for (Consumer consumer : routeService.getInputs().values()) {
                     Endpoint endpoint = consumer.getEndpoint();
@@ -1028,7 +1030,7 @@
         } catch (Exception e) {
             // fire event that we failed to start
             EventHelper.notifyCamelContextStartupFailed(this, e);
-            // rethrown cause
+            // rethrow cause
             throw e;
         }
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=909995&r1=909994&r2=909995&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java Sun Feb 14 09:59:12 2010
@@ -54,6 +54,7 @@
                       Predicate aggregationCompletedPredicate) {
         this(processor, new PredicateAggregationCollection(correlationExpression, aggregationStrategy, aggregationCompletedPredicate));
         this.correlationExpression = correlationExpression;
+        setCompletionPredicate(aggregationCompletedPredicate);
     }
 
     public Aggregator(Processor processor, AggregationCollection collection) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=909995&r1=909994&r2=909995&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Sun Feb 14 09:59:12 2010
@@ -22,6 +22,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -30,6 +31,7 @@
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
@@ -47,6 +49,8 @@
  */
 public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
 
+    // TODO: Should aggregate on the fly as well
+
     public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
     public static final int DEFAULT_BATCH_SIZE = 100;
 
@@ -57,6 +61,7 @@
     private int outBatchSize;
     private boolean groupExchanges;
     private boolean batchConsumer;
+    private Predicate completionPredicate;
 
     private final Processor processor;
     private final Collection<Exchange> collection;
@@ -154,6 +159,14 @@
         this.batchConsumer = batchConsumer;
     }
 
+    public Predicate getCompletionPredicate() {
+        return completionPredicate;
+    }
+
+    public void setCompletionPredicate(Predicate completionPredicate) {
+        this.completionPredicate = completionPredicate;
+    }
+
     public Processor getProcessor() {
         return processor;
     }
@@ -198,7 +211,7 @@
     protected void processExchange(Exchange exchange) throws Exception {
         processor.process(exchange);
         if (exchange.getException() != null) {
-            getExceptionHandler().handleException("Error processing Exchange: " + exchange, exchange.getException());
+            getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, exchange.getException());
         }
     }
 
@@ -242,6 +255,7 @@
         private Queue<Exchange> queue;
         private Lock queueLock = new ReentrantLock();
         private boolean exchangeEnqueued;
+        private final Queue<String> completionPredicateMatched = new ConcurrentLinkedQueue<String>();
         private Condition exchangeEnqueuedCondition = queueLock.newCondition();
 
         public BatchSender() {
@@ -278,17 +292,38 @@
                 do {
                     try {
                         if (!exchangeEnqueued) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Waiting for new exchange to arrive or batchTimeout to occur after " + batchTimeout + " ms.");
+                            }
                             exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
                         }
 
-                        if (!exchangeEnqueued) {
-                            drainQueueTo(collection, batchSize);
-                        } else {             
+                        // if the completion predicate was triggered then there is an exchange id which denotes when to complete
+                        String id = null;
+                        if (!completionPredicateMatched.isEmpty()) {
+                            id = completionPredicateMatched.poll();
+                        }
+
+                        if (id != null || !exchangeEnqueued) {
+                            if (LOG.isTraceEnabled()) {
+                                if (id != null) {
+                                    LOG.trace("Collecting exchanges to be aggregated triggered by completion predicate");
+                                } else {
+                                    LOG.trace("Collecting exchanges to be aggregated triggered by batch timeout");
+                                }
+                            }
+                            drainQueueTo(collection, batchSize, id);
+                        } else {
                             exchangeEnqueued = false;
+                            boolean drained = false;
                             while (isInBatchCompleted(queue.size())) {
-                                drainQueueTo(collection, batchSize);
+                                drained = true;
+                                drainQueueTo(collection, batchSize, id);
                             }
-                            
+                            if (drained) {
+                                LOG.trace("Collecting exchanges to be aggregated triggered by new exchanges received");
+                            }
+
                             if (!isOutBatchCompleted()) {
                                 continue;
                             }
@@ -320,7 +355,7 @@
         /**
          * This method should be called with queueLock held
          */
-        private void drainQueueTo(Collection<Exchange> collection, int batchSize) {
+        private void drainQueueTo(Collection<Exchange> collection, int batchSize, String exchangeId) {
             for (int i = 0; i < batchSize; ++i) {
                 Exchange e = queue.poll();
                 if (e != null) {
@@ -331,6 +366,10 @@
                     } catch (Throwable t) {
                         getExceptionHandler().handleException(t);
                     }
+                    if (exchangeId != null && exchangeId.equals(e.getExchangeId())) {
+                        // this batch is complete so stop draining
+                        break;
+                    }
                 } else {
                     break;
                 }
@@ -342,8 +381,22 @@
         }
 
         public void enqueueExchange(Exchange exchange) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Received exchange to be batched: " + exchange);
+            }
             queueLock.lock();
             try {
+                // pre test whether the completion predicate matched
+                if (completionPredicate != null) {
+                    boolean matches = completionPredicate.matches(exchange);
+                    if (matches) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Exchange matched completion predicate: " + exchange);
+                        }
+                        // add this exchange to the list of exchanges which marks the batch as complete
+                        completionPredicateMatched.add(exchange.getExchangeId());
+                    }
+                }
                 queue.add(exchange);
                 exchangeEnqueued = true;
                 exchangeEnqueuedCondition.signal();
@@ -359,10 +412,13 @@
                 Exchange exchange = iter.next();
                 iter.remove();
                 try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Sending aggregated exchange: " + exchange);
+                    }
                     processExchange(exchange);
                 } catch (Throwable t) {
                     // must catch throwable to avoid growing memory
-                    getExceptionHandler().handleException("Error processing Exchange: " + exchange, t);
+                    getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, t);
                 }
             }
         }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java?rev=909995&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java Sun Feb 14 09:59:12 2010
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateCompletionPredicateTest extends ContextTestSupport {
+
+    public void testCompletionPredicateBeforeTimeout() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("A+B+C+END");
+        // should be faster than 10 seconds
+        mock.setResultWaitTime(10000);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMultipleCompletionPredicateBeforeTimeout() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("A+B+C+END", "D+E+END", "F+G+H+I+END");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+        template.sendBodyAndHeader("direct:start", "D", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "E", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+        template.sendBodyAndHeader("direct:start", "F", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "G", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "H", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "I", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testCompletionPredicateBeforeTimeoutTwoGroups() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("A+B+C+END", "1+2+3+4+END");
+        // should be faster than 10 seconds
+        mock.setResultWaitTime(10000);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "1", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "2", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "3", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+        template.sendBodyAndHeader("direct:start", "4", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "END", "id", "bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMultipleCompletionPredicateBeforeTimeoutTwoGroups() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("A+B+C+END", "1+2+3+4+END", "5+6+END", "D+E+END", "7+8+END", "F+G+H+I+END");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "1", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "2", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+        template.sendBodyAndHeader("direct:start", "D", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "3", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "4", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "END", "id", "bar");
+
+        template.sendBodyAndHeader("direct:start", "5", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "6", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "E", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "END", "id", "bar");
+
+        template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+        template.sendBodyAndHeader("direct:start", "F", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "7", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "G", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "H", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "8", "id", "bar");
+        template.sendBodyAndHeader("direct:start", "END", "id", "bar");
+
+        template.sendBodyAndHeader("direct:start", "I", "id", "foo");
+        template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                            .completionPredicate(body().contains("END")).batchTimeout(20000)
+                        .to("mock:aggregated");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date