You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/22 16:11:06 UTC
svn commit: r956902 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/async/ test/resources/
Author: davsclaus
Date: Tue Jun 22 14:11:05 2010
New Revision: 956902
URL: http://svn.apache.org/viewvc?rev=956902&view=rev
Log:
CAMEL-2838: Multicast, Recipient List and Splitter EIPs now support async routing engine. Work in progress.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java
- copied, changed from r956800, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java
camel/trunk/camel-core/src/test/resources/log4j.properties
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java Tue Jun 22 14:11:05 2010
@@ -74,7 +74,9 @@ public class DefaultTracedRouteNodes imp
}
public void popBlock() {
- routeNodes.pop();
+ if (!routeNodes.isEmpty()) {
+ routeNodes.pop();
+ }
}
public void pushBlock() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Jun 22 14:11:05 2010
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@@ -28,6 +29,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
@@ -37,9 +40,11 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TracedRouteNodes;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
@@ -59,44 +64,55 @@ import static org.apache.camel.util.Obje
* @see Pipeline
* @version $Revision$
*/
-public class MulticastProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
+public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
/**
* Class that represent each step in the multicast route to do
*/
- static final class ProcessorExchangePair {
+ static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
+ private final int index;
private final Processor processor;
private final Processor prepared;
private final Exchange exchange;
- /**
- * Private constructor as you must use the static creator
- * {@link org.apache.camel.processor.MulticastProcessor#createProcessorExchangePair(org.apache.camel.Processor,
- * org.apache.camel.Exchange)} which prepares the processor before its ready to be used.
- *
- * @param processor the original processor
- * @param prepared the prepared processor
- * @param exchange the exchange
- */
- private ProcessorExchangePair(Processor processor, Processor prepared, Exchange exchange) {
+ private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) {
+ this.index = index;
this.processor = processor;
this.prepared = prepared;
this.exchange = exchange;
}
- public Processor getProcessor() {
- return processor;
+ public int getIndex() {
+ return index;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public Producer getProducer() {
+ if (processor instanceof Producer) {
+ return (Producer) processor;
+ }
+ return null;
}
- public Processor getPrepared() {
+ public Processor getProcessor() {
return prepared;
}
- public Exchange getExchange() {
- return exchange;
+ public void begin() {
+ // noop
+ LOG.trace("ProcessorExchangePair #" + index + " begin: " + exchange);
+ }
+
+ public void done() {
+ // noop
+ LOG.trace("ProcessorExchangePair #" + index + " done: " + exchange);
}
+
}
private final CamelContext camelContext;
@@ -118,7 +134,6 @@ public class MulticastProcessor extends
public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
notNull(camelContext, "camelContext");
- notNull(processors, "processors");
this.camelContext = camelContext;
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
@@ -143,33 +158,51 @@ public class MulticastProcessor extends
}
public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ boolean sync = true;
+
final AtomicExchange result = new AtomicExchange();
- final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
+ final Iterable<ProcessorExchangePair> pairs;
// multicast uses fine grained error handling on the output processors
// so use try .. catch to cater for this
try {
+ pairs = createProcessorExchangePairs(exchange);
if (isParallelProcessing()) {
// ensure an executor is set when running in parallel
ObjectHelper.notNull(executorService, "executorService", this);
- doProcessParallel(result, pairs, isStreaming());
+ doProcessParallel(exchange, result, pairs, isStreaming(), callback);
} else {
- doProcessSequential(result, pairs);
+ sync = doProcessSequential(exchange, result, pairs, callback);
+ }
+
+ if (!sync) {
+ // the remainder of the multicast will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return false;
}
+ // copy results back to the original exchange
if (result.get() != null) {
ExchangeHelper.copyResults(exchange, result.get());
}
- } catch (Exception e) {
+ } catch (Throwable e) {
// multicast uses error handling on its output processors and they have tried to redeliver
// so we shall signal back to the other error handlers that we are exhausted and they should not
// also try to redeliver as we will then do that twice
exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
exchange.setException(e);
}
+
+ callback.done(true);
+ return true;
}
- protected void doProcessParallel(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs, boolean streaming) throws InterruptedException, ExecutionException {
+ protected void doProcessParallel(final Exchange original, final AtomicExchange result, Iterable<ProcessorExchangePair> pairs,
+ boolean streaming, final AsyncCallback callback) throws InterruptedException, ExecutionException {
final CompletionService<Exchange> completion;
final AtomicBoolean running = new AtomicBoolean(true);
@@ -183,9 +216,9 @@ public class MulticastProcessor extends
final AtomicInteger total = new AtomicInteger(0);
- for (ProcessorExchangePair pair : pairs) {
- final Processor processor = pair.getProcessor();
- final Processor prepared = pair.getPrepared();
+ final Iterator<ProcessorExchangePair> it = pairs.iterator();
+ while (it.hasNext()) {
+ final ProcessorExchangePair pair = it.next();
final Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total.intValue(), pairs);
@@ -196,7 +229,7 @@ public class MulticastProcessor extends
return subExchange;
}
- doProcess(processor, prepared, subExchange);
+ doProcess(original, result, it, pair, callback);
// should we stop in case of an exception occurred during processing?
if (stopOnException && subExchange.getException() != null) {
@@ -228,16 +261,28 @@ public class MulticastProcessor extends
}
}
- protected void doProcessSequential(AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws Exception {
+ protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
int total = 0;
+ Iterator<ProcessorExchangePair> it = pairs.iterator();
- for (ProcessorExchangePair pair : pairs) {
- Processor processor = pair.getProcessor();
- Processor prepared = pair.getPrepared();
+ while (it.hasNext()) {
+ ProcessorExchangePair pair = it.next();
Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total, pairs);
- doProcess(processor, prepared, subExchange);
+ boolean sync = doProcess(original, result, it, pair, callback);
+ if (!sync) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + pair.getExchange().getExchangeId() + " is continued being processed asynchronously");
+ }
+ // the remainder of the multicast will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return false;
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + pair.getExchange().getExchangeId() + " is continued being processed synchronously");
+ }
// should we stop in case of an exception occurred during processing?
if (stopOnException && subExchange.getException() != null) {
@@ -257,14 +302,23 @@ public class MulticastProcessor extends
if (LOG.isDebugEnabled()) {
LOG.debug("Done sequential processing " + total + " exchanges");
}
+
+ return true;
}
- private void doProcess(Processor processor, Processor prepared, Exchange exchange) {
+ private boolean doProcess(final Exchange original, final AtomicExchange result, final Iterator<ProcessorExchangePair> it,
+ final ProcessorExchangePair pair, final AsyncCallback callback) {
+ boolean sync = true;
+
+ final Exchange exchange = pair.getExchange();
+ Processor processor = pair.getProcessor();
+ Producer producer = pair.getProducer();
+
TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
// compute time taken if sending to another endpoint
StopWatch watch = null;
- if (processor instanceof Producer) {
+ if (producer != null) {
watch = new StopWatch();
}
@@ -275,21 +329,86 @@ public class MulticastProcessor extends
}
// let the prepared process it
- prepared.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
+ AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor);
+ pair.begin();
+ sync = async.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ pair.done();
+
+ // we only have to handle async completion of the routing slip
+ if (doneSync) {
+ return;
+ }
+
+ // TODO: total number
+ // continue processing the multicast asynchronously
+ Exchange subExchange = exchange;
+ int total = 0;
+
+ while (it.hasNext()) {
+
+ if (stopOnException && exchange.getException() != null) {
+ // wrap in exception to explain where it failed
+ exchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
+ callback.done(false);
+ return;
+ }
+
+ if (aggregationStrategy != null) {
+ doAggregate(result, subExchange);
+ }
+
+ if (it.hasNext()) {
+ // prepare and run the next
+ ProcessorExchangePair pair = it.next();
+ subExchange = pair.getExchange();
+ updateNewExchange(subExchange, total, null);
+ boolean sync = doProcess(original, result, it, pair, callback);
+
+ if (!sync) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + original.getExchangeId() + " is continued being processed asynchronously");
+ }
+ return;
+ }
+
+ total++;
+ }
+ }
+
+ // remember to test for stop on exception and aggregate before copying back results
+ if (stopOnException && exchange.getException() != null) {
+ // wrap in exception to explain where it failed
+ exchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
+ callback.done(false);
+ return;
+ }
+
+ if (aggregationStrategy != null) {
+ doAggregate(result, subExchange);
+ }
+
+ // copy results back to the original exchange
+ if (result.get() != null) {
+ ExchangeHelper.copyResults(original, result.get());
+ }
+ callback.done(false);
+ }
+ });
} finally {
// pop the block so by next round we have the same staring point and thus the tracing looks accurate
if (traced != null) {
traced.popBlock();
}
- if (processor instanceof Producer) {
+ if (producer != null) {
long timeTaken = watch.stop();
- Endpoint endpoint = ((Producer) processor).getEndpoint();
+ Endpoint endpoint = producer.getEndpoint();
// emit event that the exchange was sent to the endpoint
EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
}
}
+
+ return sync;
}
/**
@@ -314,9 +433,10 @@ public class MulticastProcessor extends
protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
+ int index = 0;
for (Processor processor : processors) {
Exchange copy = exchange.copy();
- result.add(createProcessorExchangePair(processor, copy));
+ result.add(createProcessorExchangePair(index++, processor, copy));
}
return result;
@@ -332,7 +452,7 @@ public class MulticastProcessor extends
* @param exchange the exchange
* @return prepared for use
*/
- protected static ProcessorExchangePair createProcessorExchangePair(Processor processor, Exchange exchange) {
+ protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange) {
Processor prepared = processor;
// set property which endpoint we send to
@@ -355,7 +475,7 @@ public class MulticastProcessor extends
}
}
- return new ProcessorExchangePair(processor, prepared, exchange);
+ return new DefaultProcessorExchangePair(index, processor, prepared, exchange);
}
protected void doStart() throws Exception {
@@ -369,7 +489,7 @@ public class MulticastProcessor extends
ServiceHelper.stopServices(processors);
}
- private static void setToEndpoint(Exchange exchange, Processor processor) {
+ protected static void setToEndpoint(Exchange exchange, Processor processor) {
if (processor instanceof Producer) {
Producer producer = (Producer) processor;
exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java?rev=956902&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java Tue Jun 22 14:11:05 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+
+/**
+ * Exchange pair to be executed by {@link org.apache.camel.processor.MulticastProcessor}.
+ *
+ * @version $Revision$
+ */
+public interface ProcessorExchangePair {
+
+ int getIndex();
+
+ Exchange getExchange();
+
+ Producer getProducer();
+
+ Processor getProcessor();
+
+ void begin();
+
+ void done();
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Tue Jun 22 14:11:05 2010
@@ -23,16 +23,17 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
@@ -49,7 +50,7 @@ import static org.apache.camel.util.Obje
*
* @version $Revision$
*/
-public class RecipientList extends ServiceSupport implements Processor {
+public class RecipientList extends ServiceSupport implements AsyncProcessor {
private static final transient Log LOG = LogFactory.getLog(RecipientList.class);
private final CamelContext camelContext;
private ProducerCache producerCache;
@@ -93,53 +94,40 @@ public class RecipientList extends Servi
}
public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
if (!isStarted()) {
throw new IllegalStateException("RecipientList has not been started: " + this);
}
Object recipientList = expression.evaluate(exchange, Object.class);
- sendToRecipientList(exchange, recipientList);
+ return sendToRecipientList(exchange, recipientList, callback);
+ }
+
+ public boolean sendToRecipientList(Exchange exchange, Object routingSlip) {
+ // this method is invoked from @RecipientList so we bridge with an empty callback
+ // TODO: Have @RecipientList support async out of the box
+ return sendToRecipientList(exchange, routingSlip, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // noop
+ }
+ });
}
/**
* Sends the given exchange to the recipient list
*/
- public void sendToRecipientList(Exchange exchange, Object recipientList) throws Exception {
+ public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) {
Iterator<Object> iter = ObjectHelper.createIterator(recipientList, delimiter);
- // we should acquire and release the producers we need so we can leverage the producer
- // cache to the fullest
- Map<Endpoint, Producer> producers = new LinkedHashMap<Endpoint, Producer>();
- try {
- List<Processor> processors = new ArrayList<Processor>();
- while (iter.hasNext()) {
- Object recipient = iter.next();
- try {
- Endpoint endpoint = resolveEndpoint(exchange, recipient);
- // acquire producer which we then release later
- Producer producer = producerCache.acquireProducer(endpoint);
- processors.add(producer);
- producers.put(endpoint, producer);
- } catch (Exception e) {
- if (isIgnoreInvalidEndpoints()) {
- LOG.info("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
- } else {
- throw e;
- }
- }
- }
+ RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
+ isParallelProcessing(), getExecutorService(), false, isStopOnException());
+ rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
- MulticastProcessor mp = new MulticastProcessor(exchange.getContext(), processors, getAggregationStrategy(),
- isParallelProcessing(), getExecutorService(), false, isStopOnException());
-
- // now let the multicast process the exchange
- mp.process(exchange);
- } finally {
- // and release the producers back to the producer cache
- for (Map.Entry<Endpoint, Producer> entry : producers.entrySet()) {
- producerCache.releaseProducer(entry.getKey(), entry.getValue());
- }
- }
+ // now let the multicast process the exchange
+ return rlp.process(exchange, callback);
}
protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
@@ -162,7 +150,7 @@ public class RecipientList extends Servi
protected void doStop() throws Exception {
ServiceHelper.stopService(producerCache);
}
-
+
public boolean isIgnoreInvalidEndpoints() {
return ignoreInvalidEndpoints;
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java?rev=956902&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java Tue Jun 22 14:11:05 2010
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Implements a dynamic <a
+ * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
+ * pattern where the list of actual endpoints to send a message exchange to are
+ * dependent on some dynamic expression.
+ * <p/>
+ * This implementation is a specialized {@link org.apache.camel.processor.MulticastProcessor} which is based
+ * on recipient lists. This implementation have to handle the fact the processors is not known at design time
+ * but evaluated at runtime from the dynamic recipient list. Therefore this implementation have to at runtime
+ * lookup endpoints and create producers which should act as the processors for the multicast processors which
+ * runs under the hood. Also this implementation supports the asynchronous routing engine which makes the code
+ * more trickier.
+ *
+ * @version $Revision$
+ */
+public class RecipientListProcessor extends MulticastProcessor {
+
+ private static final transient Log LOG = LogFactory.getLog(RecipientListProcessor.class);
+ private final Iterator<Object> iter;
+ private boolean ignoreInvalidEndpoints;
+ private ProducerCache producerCache;
+
+ /**
+ * Class that represent each step in the recipient list to do
+ * <p/>
+ * This implementation ensures the provided producer is being released back in the producer cache when
+ * its done using it.
+ */
+ static final class RecipientProcessorExchangePair implements ProcessorExchangePair {
+ private final int index;
+ private final Endpoint endpoint;
+ private final Producer producer;
+ private Processor prepared;
+ private final Exchange exchange;
+ private final ProducerCache producerCache;
+
+ private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer,
+ Processor prepared, Exchange exchange) {
+ this.index = index;
+ this.producerCache = producerCache;
+ this.endpoint = endpoint;
+ this.producer = producer;
+ this.prepared = prepared;
+ this.exchange = exchange;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public Producer getProducer() {
+ return producer;
+ }
+
+ public Processor getProcessor() {
+ return prepared;
+ }
+
+ public void begin() {
+ // we have already acquired and prepare the producer so we
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RecipientProcessorExchangePair #" + index + " begin: " + exchange);
+ }
+ }
+
+ public void done() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RecipientProcessorExchangePair #" + index + " done: " + exchange);
+ }
+ // when we are done we should release back in pool
+ try {
+ producerCache.releaseProducer(endpoint, producer);
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e);
+ }
+ }
+ }
+
+ }
+
+ public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter) {
+ super(camelContext, null);
+ this.producerCache = producerCache;
+ this.iter = iter;
+ }
+
+ public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy) {
+ super(camelContext, null, aggregationStrategy);
+ this.producerCache = producerCache;
+ this.iter = iter;
+ }
+
+ public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
+ boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
+ super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, streaming, stopOnException);
+ this.producerCache = producerCache;
+ this.iter = iter;
+ }
+
+ public boolean isIgnoreInvalidEndpoints() {
+ return ignoreInvalidEndpoints;
+ }
+
+ public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
+ this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
+ }
+
+ @Override
+ protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+ // here we iterate the recipient lists and create the exchange pair for each of those
+ List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>();
+
+ // at first we must lookup the endpoint and acquire the producer which can send to the endpoint
+ int index = 0;
+ while (iter.hasNext()) {
+ Object recipient = iter.next();
+ Endpoint endpoint;
+ Producer producer;
+ try {
+ endpoint = resolveEndpoint(exchange, recipient);
+ producer = producerCache.acquireProducer(endpoint);
+ } catch (Exception e) {
+ if (isIgnoreInvalidEndpoints()) {
+ LOG.info("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
+ continue;
+ } else {
+ // failure so break out
+ throw e;
+ }
+ }
+
+ // then create the exchange pair
+ Exchange copy = exchange.copy();
+ result.add(createProcessorExchangePair(index++, endpoint, producer, copy));
+ }
+
+ return result;
+ }
+
+ /**
+ * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead
+ */
+ protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange) {
+ Processor prepared = producer;
+
+ // set property which endpoint we send to
+ setToEndpoint(exchange, prepared);
+
+ // rework error handling to support fine grained error handling
+ if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) {
+ // wrap the producer in error handler so we have fine grained error handling on
+ // the output side instead of the input side
+ // this is needed to support redelivery on that output alone and not doing redelivery
+ // for the entire multicast block again which will start from scratch again
+ RouteContext routeContext = exchange.getUnitOfWork().getRouteContext();
+ ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder();
+ // create error handler (create error handler directly to keep it light weight,
+ // instead of using ProcessorDefinition.wrapInErrorHandler)
+ try {
+ prepared = builder.createErrorHandler(routeContext, prepared);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, exchange);
+ }
+
+ protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+ // trim strings as end users might have added spaces between separators
+ if (recipient instanceof String) {
+ recipient = ((String) recipient).trim();
+ }
+ return ExchangeHelper.resolveEndpoint(exchange, recipient);
+ }
+
+ protected void doStart() throws Exception {
+ super.doStart();
+ if (producerCache == null) {
+ producerCache = new ProducerCache(this, getCamelContext());
+ // add it as a service so we can manage it
+ getCamelContext().addService(producerCache);
+ }
+ ServiceHelper.startService(producerCache);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(producerCache);
+ super.doStop();
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Tue Jun 22 14:11:05 2010
@@ -268,7 +268,7 @@ public class RoutingSlip extends Service
exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
boolean sync = asyncProducer.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
- // we only have to handle async completion of the pipeline
+ // we only have to handle async completion of the routing slip
if (doneSync) {
return;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue Jun 22 14:11:05 2010
@@ -23,6 +23,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -43,7 +45,7 @@ import static org.apache.camel.util.Obje
*
* @version $Revision$
*/
-public class Splitter extends MulticastProcessor implements Processor, Traceable {
+public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable {
private final Expression expression;
public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
@@ -70,8 +72,8 @@ public class Splitter extends MulticastP
}
@Override
- public void process(Exchange exchange) throws Exception {
- AggregationStrategy strategy = getAggregationStrategy();
+ public boolean process(Exchange exchange, final AsyncCallback callback) {
+ final AggregationStrategy strategy = getAggregationStrategy();
// if original aggregation strategy then store exchange
// on it as the original exchange
@@ -81,11 +83,14 @@ public class Splitter extends MulticastP
original.setOriginal(exchange);
}
- super.process(exchange);
-
- if (original != null) {
- // and remove the reference when we are done (due to thread local stuff)
- original.setOriginal(null);
+ // TODO: we will lose the original in the async routing engine when it return false
+ try {
+ return super.process(exchange, callback);
+ } finally {
+ if (original != null) {
+ // and remove the reference when we are done (due to thread local stuff)
+ original.setOriginal(null);
+ }
}
}
@@ -108,6 +113,8 @@ public class Splitter extends MulticastP
public Iterator iterator() {
return new Iterator() {
+ private int index;
+
public boolean hasNext() {
return iterator.hasNext();
}
@@ -121,7 +128,7 @@ public class Splitter extends MulticastP
Message in = newExchange.getIn();
in.setBody(part);
}
- return createProcessorExchangePair(getProcessors().iterator().next(), newExchange);
+ return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange);
}
public void remove() {
@@ -141,6 +148,8 @@ public class Splitter extends MulticastP
} else {
result = new ArrayList<ProcessorExchangePair>();
}
+
+ int index = 0;
Iterator<Object> iter = ObjectHelper.createIterator(value);
while (iter.hasNext()) {
Object part = iter.next();
@@ -151,7 +160,7 @@ public class Splitter extends MulticastP
Message in = newExchange.getIn();
in.setBody(part);
}
- result.add(createProcessorExchangePair(getProcessors().iterator().next(), newExchange));
+ result.add(createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange));
}
return result;
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java Tue Jun 22 14:11:05 2010
@@ -39,8 +39,7 @@ public class AsyncEndpointRecipientList3
assertMockEndpointsSatisfied();
- // use same threads as its recipient list, and direct is the last in the recipient list
- assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
}
@Override
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java (from r956800, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java&r1=956800&r2=956902&rev=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java Tue Jun 22 14:11:05 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointRecipientList3Test extends ContextTestSupport {
+public class AsyncEndpointRecipientList4Test extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
@@ -39,8 +39,7 @@ public class AsyncEndpointRecipientList3
assertMockEndpointsSatisfied();
- // use same threads as its recipient list, and direct is the last in the recipient list
- assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
}
@Override
@@ -58,7 +57,7 @@ public class AsyncEndpointRecipientList3
beforeThreadName = Thread.currentThread().getName();
}
})
- .recipientList(constant("async:Hi Camel,direct:foo"));
+ .recipientList(constant("async:Hi Camel,async:Hi World,direct:foo"));
from("direct:foo")
.process(new Processor() {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java Tue Jun 22 14:11:05 2010
@@ -37,8 +37,7 @@ public class AsyncEndpointRecipientListT
String reply = template.requestBody("direct:start", "Hello Camel", String.class);
assertEquals("Bye Camel", reply);
- // should use same threads (recipient list is not async supported yet)
- assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
}
@Override
Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Tue Jun 22 14:11:05 2010
@@ -28,6 +28,9 @@ log4j.logger.org.apache.activemq.spring=
#log4j.logger.org.apache.camel.component.mock=DEBUG
#log4j.logger.org.apache.camel.component.file=TRACE
#log4j.logger.org.apache.camel.processor.Pipeline=TRACE
+#log4j.logger.org.apache.camel.processor.MulticastProcessor=TRACE
+#log4j.logger.org.apache.camel.processor.RecipientList=TRACE
+#log4j.logger.org.apache.camel.processor.RecipientListProcessor=TRACE
#log4j.logger.org.apache.camel.processor.RoutingSlip=TRACE
#log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
log4j.logger.org.apache.camel.impl.converter=WARN