You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/29 11:43:42 UTC
svn commit: r958905 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/
main/java/org/apache/camel/util/ test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Tue Jun 29 09:43:41 2010
New Revision: 958905
URL: http://svn.apache.org/viewvc?rev=958905&view=rev
Log:
CAMEL-2871: Loop DSL now support async routing engine.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java
- copied, changed from r958893, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java?rev=958905&r1=958904&r2=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoopDefinition.java Tue Jun 29 09:43:41 2010
@@ -22,7 +22,6 @@ import javax.xml.bind.annotation.XmlRoot
import org.apache.camel.Expression;
import org.apache.camel.Processor;
-import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.LoopProcessor;
import org.apache.camel.spi.RouteContext;
@@ -69,14 +68,4 @@ public class LoopDefinition extends Expr
return new LoopProcessor(getExpression().createExpression(routeContext), output);
}
- // Fluent API
- //-------------------------------------------------------------------------
-
- /**
- * Set the expression that LoopType will use
- * @return the builder
- */
- public ExpressionClause<LoopDefinition> expression() {
- return ExpressionClause.createAndSetExpression(this);
- }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java?rev=958905&r1=958904&r2=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java Tue Jun 29 09:43:41 2010
@@ -16,8 +16,12 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
@@ -28,33 +32,120 @@ import org.apache.commons.logging.LogFac
*
* @version $Revision$
*/
-public class LoopProcessor extends DelegateProcessor implements Traceable {
+public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
private static final Log LOG = LogFactory.getLog(LoopProcessor.class);
private final Expression expression;
- // TODO: should support async routing engine
-
public LoopProcessor(Expression expression, Processor processor) {
super(processor);
this.expression = expression;
}
@Override
- public void process(Exchange exchange) throws Exception {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ // use atomic integer to be able to pass reference and keep track on the values
+ AtomicInteger index = new AtomicInteger();
+ AtomicInteger count = new AtomicInteger();
+
// Intermediate conversion to String is needed when direct conversion to Integer is not available
// but evaluation result is a textual representation of a numeric value.
String text = expression.evaluate(exchange, String.class);
- int count = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
+ try {
+ int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
+ count.set(num);
+ } catch (NoTypeConversionAvailableException e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ // set the size before we start
exchange.setProperty(Exchange.LOOP_SIZE, count);
- for (int i = 0; i < count; i++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("LoopProcessor: iteration #" + i);
+
+ // loop synchronously
+ while (index.get() < count.get()) {
+
+ // and prepare for next iteration
+ ExchangeHelper.prepareOutToIn(exchange);
+ boolean sync = process(exchange, callback, index, count);
+
+ if (!sync) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ }
+ // the remainder of the routing slip will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return false;
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
}
- exchange.setProperty(Exchange.LOOP_INDEX, i);
- super.process(exchange);
+
+ // increment counter before next loop
+ index.getAndIncrement();
}
+
+ // we are done so prepare the result
+ ExchangeHelper.prepareOutToIn(exchange);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
+ }
+ callback.done(true);
+ return true;
+ }
+
+ protected boolean process(final Exchange exchange, final AsyncCallback callback,
+ final AtomicInteger index, final AtomicInteger count) {
+
+ // set current index as property
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LoopProcessor: iteration #" + index.get());
+ }
+ exchange.setProperty(Exchange.LOOP_INDEX, index.get());
+
+ boolean sync = processNext(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // we only have to handle async completion of the routing slip
+ if (doneSync) {
+ return;
+ }
+
+ // increment index as we have just processed once
+ index.getAndIncrement();
+
+ // continue looping asynchronously
+ while (index.get() < count.get()) {
+
+ // and prepare for next iteration
+ ExchangeHelper.prepareOutToIn(exchange);
+
+ // process again
+ boolean sync = process(exchange, callback, index, count);
+ if (!sync) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ }
+ // the remainder of the routing slip will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return;
+ }
+
+ // increment counter before next loop
+ index.getAndIncrement();
+ }
+
+ // we are done so prepare the result
+ ExchangeHelper.prepareOutToIn(exchange);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
+ }
+ callback.done(false);
+ }
+ });
+
+ return sync;
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=958905&r1=958904&r2=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java Tue Jun 29 09:43:41 2010
@@ -29,6 +29,7 @@ import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,7 +70,7 @@ public class TryProcessor extends Servic
Iterator<AsyncProcessor> processors = getProcessors().iterator();
while (continueRouting(processors, exchange)) {
- prepareResult(exchange);
+ ExchangeHelper.prepareOutToIn(exchange);
// process the next processor
AsyncProcessor processor = processors.next();
@@ -90,7 +91,7 @@ public class TryProcessor extends Servic
}
}
- prepareResult(exchange);
+ ExchangeHelper.prepareOutToIn(exchange);
if (LOG.isTraceEnabled()) {
LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
}
@@ -116,7 +117,7 @@ public class TryProcessor extends Servic
// continue processing the try .. catch .. finally asynchronously
while (continueRouting(processors, exchange)) {
- prepareResult(exchange);
+ ExchangeHelper.prepareOutToIn(exchange);
// process the next processor
AsyncProcessor processor = processors.next();
@@ -132,7 +133,7 @@ public class TryProcessor extends Servic
}
}
- prepareResult(exchange);
+ ExchangeHelper.prepareOutToIn(exchange);
if (LOG.isTraceEnabled()) {
LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
}
@@ -163,19 +164,6 @@ public class TryProcessor extends Servic
return it.hasNext();
}
- /**
- * Strategy to prepare results before next iterator or when we are complete
- *
- * @param exchange the result exchange
- */
- protected static void prepareResult(Exchange exchange) {
- // we are routing using pipes and filters so we need to manually copy OUT to IN
- if (exchange.hasOut()) {
- exchange.getIn().copyFrom(exchange.getOut());
- exchange.setOut(null);
- }
- }
-
protected void doStart() throws Exception {
processors = new ArrayList<AsyncProcessor>();
processors.add(tryProcessor);
@@ -321,7 +309,7 @@ public class TryProcessor extends Servic
}
// signal callback to continue routing async
- prepareResult(exchange);
+ ExchangeHelper.prepareOutToIn(exchange);
if (LOG.isTraceEnabled()) {
LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
}
@@ -395,7 +383,7 @@ public class TryProcessor extends Servic
}
// signal callback to continue routing async
- prepareResult(exchange);
+ ExchangeHelper.prepareOutToIn(exchange);
callback.done(false);
}
});
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=958905&r1=958904&r2=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Tue Jun 29 09:43:41 2010
@@ -592,4 +592,18 @@ public final class ExchangeHelper {
return sb.toString().trim();
}
+ /**
+ * Strategy to prepare results before next iterator or when we are complete,
+ * which is done by copying OUT to IN, so there is only an IN as input
+ * for the next iteration.
+ *
+ * @param exchange the exchange to prepare
+ */
+ public static void prepareOutToIn(Exchange exchange) {
+ // we are routing using pipes and filters so we need to manually copy OUT to IN
+ if (exchange.hasOut()) {
+ exchange.getIn().copyFrom(exchange.getOut());
+ exchange.setOut(null);
+ }
+ }
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java (from r958893, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=958893&r2=958905&rev=958905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointLoopTest.java Tue Jun 29 09:43:41 2010
@@ -24,13 +24,14 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointLoopTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
public void testAsyncEndpoint() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:loop").expectedBodiesReceived("Hello Camel", "Bye Camel");
getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
@@ -57,7 +58,10 @@ public class AsyncEndpointTest extends C
beforeThreadName = Thread.currentThread().getName();
}
})
- .to("async:Bye Camel")
+ .loop(2)
+ .to("mock:loop")
+ .to("async:Bye Camel")
+ .end()
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
afterThreadName = Thread.currentThread().getName();
@@ -70,4 +74,4 @@ public class AsyncEndpointTest extends C
};
}
-}
+}
\ No newline at end of file