You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/29 11:43:42 UTC

svn commit: r958905 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/ test/java/org/apache/camel/processor/async/

Author: davsclaus
Date: Tue Jun 29 09:43:41 2010
New Revision: 958905

URL: http://svn.apache.org/viewvc?rev=958905&view=rev
Log:
CAMEL-2871: Loop DSL now support async routing engine.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java
      - copied, changed from r958893, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java?rev=958905&r1=958904&r2=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java Tue Jun 29 09:43:41 2010
@@ -22,7 +22,6 @@ import javax.xml.bind.annotation.XmlRoot
 
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.LoopProcessor;
 import org.apache.camel.spi.RouteContext;
@@ -69,14 +68,4 @@ public class LoopDefinition extends Expr
         return new LoopProcessor(getExpression().createExpression(routeContext), output);
     }
     
-     // Fluent API
-    //-------------------------------------------------------------------------
-
-    /**
-     * Set the expression that LoopType will use
-     * @return the builder
-     */
-    public ExpressionClause<LoopDefinition> expression() {
-        return ExpressionClause.createAndSetExpression(this);
-    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java?rev=958905&r1=958904&r2=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java Tue Jun 29 09:43:41 2010
@@ -16,8 +16,12 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
@@ -28,33 +32,120 @@ import org.apache.commons.logging.LogFac
  *
  * @version $Revision$
  */
-public class LoopProcessor extends DelegateProcessor implements Traceable {
+public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
     private static final Log LOG = LogFactory.getLog(LoopProcessor.class);
 
     private final Expression expression;
 
-    // TODO: should support async routing engine
-
     public LoopProcessor(Expression expression, Processor processor) {
         super(processor);
         this.expression = expression;
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // use atomic integer to be able to pass reference and keep track on the values
+        AtomicInteger index = new AtomicInteger();
+        AtomicInteger count = new AtomicInteger();
+
         // Intermediate conversion to String is needed when direct conversion to Integer is not available
         // but evaluation result is a textual representation of a numeric value.
         String text = expression.evaluate(exchange, String.class);
-        int count = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
+        try {
+            int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
+            count.set(num);
+        } catch (NoTypeConversionAvailableException e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
 
+        // set the size before we start
         exchange.setProperty(Exchange.LOOP_SIZE, count);
-        for (int i = 0; i < count; i++) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("LoopProcessor: iteration #" + i);
+
+        // loop synchronously
+        while (index.get() < count.get()) {
+
+            // and prepare for next iteration
+            ExchangeHelper.prepareOutToIn(exchange);
+            boolean sync = process(exchange, callback, index, count);
+
+            if (!sync) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+                }
+                // the remainder of the routing slip will be completed async
+                // so we break out now, then the callback will be invoked which then continue routing from where we left here
+                return false;
+            }
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
             }
-            exchange.setProperty(Exchange.LOOP_INDEX, i);
-            super.process(exchange);
+
+            // increment counter before next loop
+            index.getAndIncrement();
         }
+
+        // we are done so prepare the result
+        ExchangeHelper.prepareOutToIn(exchange);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
+        }
+        callback.done(true);
+        return true;
+    }
+
+    protected boolean process(final Exchange exchange, final AsyncCallback callback,
+                              final AtomicInteger index, final AtomicInteger count) {
+
+        // set current index as property
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("LoopProcessor: iteration #" + index.get());
+        }
+        exchange.setProperty(Exchange.LOOP_INDEX, index.get());
+
+        boolean sync = processNext(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                // we only have to handle async completion of the routing slip
+                if (doneSync) {
+                    return;
+                }
+
+                // increment index as we have just processed once
+                index.getAndIncrement();
+
+                // continue looping asynchronously
+                while (index.get() < count.get()) {
+
+                    // and prepare for next iteration
+                    ExchangeHelper.prepareOutToIn(exchange);
+
+                    // process again
+                    boolean sync = process(exchange, callback, index, count);
+                    if (!sync) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+                        }
+                        // the remainder of the routing slip will be completed async
+                        // so we break out now, then the callback will be invoked which then continue routing from where we left here
+                        return;
+                    }
+
+                    // increment counter before next loop
+                    index.getAndIncrement();
+                }
+
+                // we are done so prepare the result
+                ExchangeHelper.prepareOutToIn(exchange);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
+                }
+                callback.done(false);
+            }
+        });
+
+        return sync;
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=958905&r1=958904&r2=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java Tue Jun 29 09:43:41 2010
@@ -29,6 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,7 +70,7 @@ public class TryProcessor extends Servic
         Iterator<AsyncProcessor> processors = getProcessors().iterator();
 
         while (continueRouting(processors, exchange)) {
-            prepareResult(exchange);
+            ExchangeHelper.prepareOutToIn(exchange);
 
             // process the next processor
             AsyncProcessor processor = processors.next();
@@ -90,7 +91,7 @@ public class TryProcessor extends Servic
             }
         }
 
-        prepareResult(exchange);
+        ExchangeHelper.prepareOutToIn(exchange);
         if (LOG.isTraceEnabled()) {
             LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
         }
@@ -116,7 +117,7 @@ public class TryProcessor extends Servic
 
                 // continue processing the try .. catch .. finally asynchronously
                 while (continueRouting(processors, exchange)) {
-                    prepareResult(exchange);
+                    ExchangeHelper.prepareOutToIn(exchange);
 
                     // process the next processor
                     AsyncProcessor processor = processors.next();
@@ -132,7 +133,7 @@ public class TryProcessor extends Servic
                     }
                 }
 
-                prepareResult(exchange);
+                ExchangeHelper.prepareOutToIn(exchange);
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
                 }
@@ -163,19 +164,6 @@ public class TryProcessor extends Servic
         return it.hasNext();
     }
 
-    /**
-     * Strategy to prepare results before next iterator or when we are complete
-     *
-     * @param exchange the result exchange
-     */
-    protected static void prepareResult(Exchange exchange) {
-        // we are routing using pipes and filters so we need to manually copy OUT to IN
-        if (exchange.hasOut()) {
-            exchange.getIn().copyFrom(exchange.getOut());
-            exchange.setOut(null);
-        }
-    }
-
     protected void doStart() throws Exception {
         processors = new ArrayList<AsyncProcessor>();
         processors.add(tryProcessor);
@@ -321,7 +309,7 @@ public class TryProcessor extends Servic
                     }
 
                     // signal callback to continue routing async
-                    prepareResult(exchange);
+                    ExchangeHelper.prepareOutToIn(exchange);
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
                     }
@@ -395,7 +383,7 @@ public class TryProcessor extends Servic
                     }
 
                     // signal callback to continue routing async
-                    prepareResult(exchange);
+                    ExchangeHelper.prepareOutToIn(exchange);
                     callback.done(false);
                 }
             });

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=958905&r1=958904&r2=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Tue Jun 29 09:43:41 2010
@@ -592,4 +592,18 @@ public final class ExchangeHelper {
         return sb.toString().trim();
     }
 
+    /**
+     * Strategy to prepare results before next iterator or when we are complete,
+     * which is done by copying OUT to IN, so there is only an IN as input
+     * for the next iteration.
+     *
+     * @param exchange the exchange to prepare
+     */
+    public static void prepareOutToIn(Exchange exchange) {
+        // we are routing using pipes and filters so we need to manually copy OUT to IN
+        if (exchange.hasOut()) {
+            exchange.getIn().copyFrom(exchange.getOut());
+            exchange.setOut(null);
+        }
+    }
 }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java (from r958893, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=958893&r2=958905&rev=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java Tue Jun 29 09:43:41 2010
@@ -24,13 +24,14 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointLoopTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
 
     public void testAsyncEndpoint() throws Exception {
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:loop").expectedBodiesReceived("Hello Camel", "Bye Camel");
         getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
         getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
 
@@ -57,7 +58,10 @@ public class AsyncEndpointTest extends C
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("async:Bye Camel")
+                        .loop(2)
+                            .to("mock:loop")
+                            .to("async:Bye Camel")
+                        .end()
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 afterThreadName = Thread.currentThread().getName();
@@ -70,4 +74,4 @@ public class AsyncEndpointTest extends C
         };
     }
 
-}
+}
\ No newline at end of file