You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/14 10:59:12 UTC
svn commit: r909995 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/aggregator/
Author: davsclaus
Date: Sun Feb 14 09:59:12 2010
New Revision: 909995
URL: http://svn.apache.org/viewvc?rev=909995&view=rev
Log:
CAMEL-1686: Aggregator now lets completion predicate being evaluated on the fly which allows the predicate to trigger before the batch timeout.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=909995&r1=909994&r2=909995&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sun Feb 14 09:59:12 2010
@@ -902,6 +902,7 @@
return;
}
+ // super will invoke doStart which will prepare internal services before we continue and start the routes below
super.start();
LOG.debug("Starting routes...");
@@ -972,6 +973,7 @@
for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
Integer order = entry.getKey();
Route route = entry.getValue().getRoute();
+
RouteService routeService = entry.getValue().getRouteService();
for (Consumer consumer : routeService.getInputs().values()) {
Endpoint endpoint = consumer.getEndpoint();
@@ -1028,7 +1030,7 @@
} catch (Exception e) {
// fire event that we failed to start
EventHelper.notifyCamelContextStartupFailed(this, e);
- // rethrown cause
+ // rethrow cause
throw e;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=909995&r1=909994&r2=909995&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java Sun Feb 14 09:59:12 2010
@@ -54,6 +54,7 @@
Predicate aggregationCompletedPredicate) {
this(processor, new PredicateAggregationCollection(correlationExpression, aggregationStrategy, aggregationCompletedPredicate));
this.correlationExpression = correlationExpression;
+ setCompletionPredicate(aggregationCompletedPredicate);
}
public Aggregator(Processor processor, AggregationCollection collection) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=909995&r1=909994&r2=909995&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Sun Feb 14 09:59:12 2010
@@ -22,6 +22,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -30,6 +31,7 @@
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
+import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
@@ -47,6 +49,8 @@
*/
public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
+ // TODO: Should aggregate on the fly as well
+
public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
public static final int DEFAULT_BATCH_SIZE = 100;
@@ -57,6 +61,7 @@
private int outBatchSize;
private boolean groupExchanges;
private boolean batchConsumer;
+ private Predicate completionPredicate;
private final Processor processor;
private final Collection<Exchange> collection;
@@ -154,6 +159,14 @@
this.batchConsumer = batchConsumer;
}
+ public Predicate getCompletionPredicate() {
+ return completionPredicate;
+ }
+
+ public void setCompletionPredicate(Predicate completionPredicate) {
+ this.completionPredicate = completionPredicate;
+ }
+
public Processor getProcessor() {
return processor;
}
@@ -198,7 +211,7 @@
protected void processExchange(Exchange exchange) throws Exception {
processor.process(exchange);
if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error processing Exchange: " + exchange, exchange.getException());
+ getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, exchange.getException());
}
}
@@ -242,6 +255,7 @@
private Queue<Exchange> queue;
private Lock queueLock = new ReentrantLock();
private boolean exchangeEnqueued;
+ private final Queue<String> completionPredicateMatched = new ConcurrentLinkedQueue<String>();
private Condition exchangeEnqueuedCondition = queueLock.newCondition();
public BatchSender() {
@@ -278,17 +292,38 @@
do {
try {
if (!exchangeEnqueued) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for new exchange to arrive or batchTimeout to occur after " + batchTimeout + " ms.");
+ }
exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
}
- if (!exchangeEnqueued) {
- drainQueueTo(collection, batchSize);
- } else {
+ // if the completion predicate was triggered then there is an exchange id which denotes when to complete
+ String id = null;
+ if (!completionPredicateMatched.isEmpty()) {
+ id = completionPredicateMatched.poll();
+ }
+
+ if (id != null || !exchangeEnqueued) {
+ if (LOG.isTraceEnabled()) {
+ if (id != null) {
+ LOG.trace("Collecting exchanges to be aggregated triggered by completion predicate");
+ } else {
+ LOG.trace("Collecting exchanges to be aggregated triggered by batch timeout");
+ }
+ }
+ drainQueueTo(collection, batchSize, id);
+ } else {
exchangeEnqueued = false;
+ boolean drained = false;
while (isInBatchCompleted(queue.size())) {
- drainQueueTo(collection, batchSize);
+ drained = true;
+ drainQueueTo(collection, batchSize, id);
}
-
+ if (drained) {
+ LOG.trace("Collecting exchanges to be aggregated triggered by new exchanges received");
+ }
+
if (!isOutBatchCompleted()) {
continue;
}
@@ -320,7 +355,7 @@
/**
* This method should be called with queueLock held
*/
- private void drainQueueTo(Collection<Exchange> collection, int batchSize) {
+ private void drainQueueTo(Collection<Exchange> collection, int batchSize, String exchangeId) {
for (int i = 0; i < batchSize; ++i) {
Exchange e = queue.poll();
if (e != null) {
@@ -331,6 +366,10 @@
} catch (Throwable t) {
getExceptionHandler().handleException(t);
}
+ if (exchangeId != null && exchangeId.equals(e.getExchangeId())) {
+ // this batch is complete so stop draining
+ break;
+ }
} else {
break;
}
@@ -342,8 +381,22 @@
}
public void enqueueExchange(Exchange exchange) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received exchange to be batched: " + exchange);
+ }
queueLock.lock();
try {
+ // pre test whether the completion predicate matched
+ if (completionPredicate != null) {
+ boolean matches = completionPredicate.matches(exchange);
+ if (matches) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Exchange matched completion predicate: " + exchange);
+ }
+ // add this exchange to the list of exchanges which marks the batch as complete
+ completionPredicateMatched.add(exchange.getExchangeId());
+ }
+ }
queue.add(exchange);
exchangeEnqueued = true;
exchangeEnqueuedCondition.signal();
@@ -359,10 +412,13 @@
Exchange exchange = iter.next();
iter.remove();
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending aggregated exchange: " + exchange);
+ }
processExchange(exchange);
} catch (Throwable t) {
// must catch throwable to avoid growing memory
- getExceptionHandler().handleException("Error processing Exchange: " + exchange, t);
+ getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, t);
}
}
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java?rev=909995&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java Sun Feb 14 09:59:12 2010
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateCompletionPredicateTest extends ContextTestSupport {
+
+ public void testCompletionPredicateBeforeTimeout() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("A+B+C+END");
+ // should be faster than 10 seconds
+ mock.setResultWaitTime(10000);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMultipleCompletionPredicateBeforeTimeout() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("A+B+C+END", "D+E+END", "F+G+H+I+END");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ template.sendBodyAndHeader("direct:start", "D", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "E", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ template.sendBodyAndHeader("direct:start", "F", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "G", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "H", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "I", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testCompletionPredicateBeforeTimeoutTwoGroups() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("A+B+C+END", "1+2+3+4+END");
+ // should be faster than 10 seconds
+ mock.setResultWaitTime(10000);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "1", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "2", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "3", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ template.sendBodyAndHeader("direct:start", "4", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "END", "id", "bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMultipleCompletionPredicateBeforeTimeoutTwoGroups() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("A+B+C+END", "1+2+3+4+END", "5+6+END", "D+E+END", "7+8+END", "F+G+H+I+END");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "1", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "2", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ template.sendBodyAndHeader("direct:start", "D", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "3", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "4", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "END", "id", "bar");
+
+ template.sendBodyAndHeader("direct:start", "5", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "6", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "E", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "END", "id", "bar");
+
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ template.sendBodyAndHeader("direct:start", "F", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "7", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "G", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "H", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "8", "id", "bar");
+ template.sendBodyAndHeader("direct:start", "END", "id", "bar");
+
+ template.sendBodyAndHeader("direct:start", "I", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionPredicate(body().contains("END")).batchTimeout(20000)
+ .to("mock:aggregated");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date