You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/01 11:36:35 UTC
svn commit: r959570 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/Enricher.java
test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java
Author: davsclaus
Date: Thu Jul 1 09:36:35 2010
New Revision: 959570
URL: http://svn.apache.org/viewvc?rev=959570&view=rev
Log:
CAMEL-2887: Enricher DSL support async routing engine.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java
- copied, changed from r959564, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=959570&r1=959569&r2=959570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Thu Jul 1 09:36:35 2010
@@ -16,15 +16,20 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
@@ -40,8 +45,9 @@ import static org.apache.camel.util.Exch
*
* @see PollEnricher
*/
-public class Enricher extends ServiceSupport implements Processor {
+public class Enricher extends ServiceSupport implements AsyncProcessor {
+ private static final transient Log LOG = LogFactory.getLog(Enricher.class);
private AggregationStrategy aggregationStrategy;
private Producer producer;
@@ -84,6 +90,10 @@ public class Enricher extends ServiceSup
this.aggregationStrategy = defaultAggregationStrategy();
}
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
@@ -93,12 +103,51 @@ public class Enricher extends ServiceSup
* message exchange with the resource endpoint fails then no aggregation
* will be done and the failed exchange content is copied over to the
* original message exchange.
- *
+ *
* @param exchange input data.
*/
- public void process(Exchange exchange) throws Exception {
- Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
- producer.process(resourceExchange);
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
+
+ AsyncProcessor ap = AsyncProcessorTypeConverter.convert(producer);
+ boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // we only have to handle async completion of the routing slip
+ if (doneSync) {
+ return;
+ }
+
+ if (resourceExchange.isFailed()) {
+ // copy resource exchange onto original exchange (preserving pattern)
+ copyResultsPreservePattern(exchange, resourceExchange);
+ } else {
+ prepareResult(exchange);
+
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+ Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ if (aggregatedExchange != null) {
+ // copy aggregation result onto original exchange (preserving pattern)
+ copyResultsPreservePattern(exchange, aggregatedExchange);
+ }
+ }
+
+ callback.done(false);
+ }
+ });
+
+ if (!sync) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ }
+ // the remainder of the routing slip will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return false;
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
+ }
if (resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
@@ -114,6 +163,9 @@ public class Enricher extends ServiceSup
copyResultsPreservePattern(exchange, aggregatedExchange);
}
}
+
+ callback.done(true);
+ return true;
}
/**
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java (from r959564, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=959564&r2=959570&rev=959570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java Thu Jul 1 09:36:35 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointEnricherTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
@@ -57,7 +57,7 @@ public class AsyncEndpointTest extends C
beforeThreadName = Thread.currentThread().getName();
}
})
- .to("async:Bye Camel")
+ .enrich("async:Bye Camel")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
afterThreadName = Thread.currentThread().getName();
@@ -70,4 +70,4 @@ public class AsyncEndpointTest extends C
};
}
-}
+}
\ No newline at end of file