You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/01 11:36:35 UTC

svn commit: r959570 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/Enricher.java test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java

Author: davsclaus
Date: Thu Jul  1 09:36:35 2010
New Revision: 959570

URL: http://svn.apache.org/viewvc?rev=959570&view=rev
Log:
CAMEL-2887: Enricher DSL support async routing engine.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java
      - copied, changed from r959564, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=959570&r1=959569&r2=959570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Thu Jul  1 09:36:35 2010
@@ -16,15 +16,20 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
 
@@ -40,8 +45,9 @@ import static org.apache.camel.util.Exch
  *
  * @see PollEnricher
  */
-public class Enricher extends ServiceSupport implements Processor {
+public class Enricher extends ServiceSupport implements AsyncProcessor {
 
+    private static final transient Log LOG = LogFactory.getLog(Enricher.class);
     private AggregationStrategy aggregationStrategy;
     private Producer producer;
 
@@ -84,6 +90,10 @@ public class Enricher extends ServiceSup
         this.aggregationStrategy = defaultAggregationStrategy();
     }
 
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
     /**
      * Enriches the input data (<code>exchange</code>) by first obtaining
      * additional data from an endpoint represented by an endpoint
@@ -93,12 +103,51 @@ public class Enricher extends ServiceSup
      * message exchange with the resource endpoint fails then no aggregation
      * will be done and the failed exchange content is copied over to the
      * original message exchange.
-     * 
+     *
      * @param exchange input data.
      */
-    public void process(Exchange exchange) throws Exception {
-        Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
-        producer.process(resourceExchange);
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
+
+        AsyncProcessor ap = AsyncProcessorTypeConverter.convert(producer);
+        boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                // we only have to handle async completion of the routing slip
+                if (doneSync) {
+                    return;
+                }
+
+                if (resourceExchange.isFailed()) {
+                    // copy resource exchange onto original exchange (preserving pattern)
+                    copyResultsPreservePattern(exchange, resourceExchange);
+                } else {
+                    prepareResult(exchange);
+
+                    // prepare the exchanges for aggregation
+                    ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+                    Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+                    if (aggregatedExchange != null) {
+                        // copy aggregation result onto original exchange (preserving pattern)
+                        copyResultsPreservePattern(exchange, aggregatedExchange);
+                    }
+                }
+
+                callback.done(false);
+            }
+        });
+
+        if (!sync) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+            }
+            // the remainder of the routing slip will be completed async
+            // so we break out now, then the callback will be invoked which then continue routing from where we left here
+            return false;
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
+        }
 
         if (resourceExchange.isFailed()) {
             // copy resource exchange onto original exchange (preserving pattern)
@@ -114,6 +163,9 @@ public class Enricher extends ServiceSup
                 copyResultsPreservePattern(exchange, aggregatedExchange);
             }
         }
+
+        callback.done(true);
+        return true;
     }
 
     /**

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java (from r959564, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=959564&r2=959570&rev=959570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java Thu Jul  1 09:36:35 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointEnricherTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
@@ -57,7 +57,7 @@ public class AsyncEndpointTest extends C
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("async:Bye Camel")
+                        .enrich("async:Bye Camel")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 afterThreadName = Thread.currentThread().getName();
@@ -70,4 +70,4 @@ public class AsyncEndpointTest extends C
         };
     }
 
-}
+}
\ No newline at end of file