You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2007/09/04 21:32:18 UTC
svn commit: r572785 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/Pipeline.java
test/java/org/apache/camel/processor/PipelineTest.java
Author: chirino
Date: Tue Sep 4 12:32:17 2007
New Revision: 572785
URL: http://svn.apache.org/viewvc?rev=572785&view=rev
Log:
Added more tests and fixed up the async case of pipeline.
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=572785&r1=572784&r2=572785&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Tue Sep 4 12:32:17 2007
@@ -102,27 +102,40 @@
exchange.throwException();
}
- public boolean process(Exchange exchange, AsyncCallback callback) {
+ public boolean process(Exchange original, AsyncCallback callback) {
Iterator<Processor> processors = getProcessors().iterator();
- Exchange nextExchange = exchange;
+ Exchange nextExchange = original;
+ boolean first = true;
while (processors.hasNext()) {
AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
- boolean sync = process(nextExchange, callback, processors, processor);
- // Continue processing the pipeline synchronously ...
- if (sync) {
- nextExchange = createNextExchange(processor, exchange);
+
+ if (nextExchange.isFailed()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " + nextExchange.getFault(false));
+ }
+ break;
+ }
+ if (first) {
+ first = false;
} else {
+ nextExchange = createNextExchange(processor, original);
+ }
+ boolean sync = process(original, nextExchange, callback, processors, processor);
+ // Continue processing the pipeline synchronously ...
+ if (!sync) {
// The pipeline will be completed async...
- return true;
+ return false;
}
}
+
// If we get here then the pipeline was processed entirely
// synchronously.
+ ExchangeHelper.copyResults(original, nextExchange);
callback.done(true);
return true;
}
- private boolean process(final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
+ private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
return processor.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
@@ -136,12 +149,22 @@
Exchange nextExchange = exchange;
while( processors.hasNext() ) {
AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+
+ if (nextExchange.isFailed()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " + nextExchange.getFault(false));
+ }
+ break;
+ }
+
nextExchange = createNextExchange(processor, exchange);
- sync = process( nextExchange, callback, processors, processor);
+ sync = process( original, nextExchange, callback, processors, processor);
if( !sync ) {
return;
}
}
+
+ ExchangeHelper.copyResults(original, nextExchange);
callback.done(true);
}
});
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java?rev=572785&r1=572784&r2=572785&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java Tue Sep 4 12:32:17 2007
@@ -27,6 +27,37 @@
* @version $Revision: 1.1 $
*/
public class PipelineTest extends ContextTestSupport {
+
+ /**
+ * Simple processor the copies the in to the out and increments a counter.
+ * Used to verify that the pipeline actually takes the output of one stage of
+ * the pipe and feeds it in as input into the next stage.
+ */
+ private final class InToOut implements Processor {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getOut(true).copyFrom(exchange.getIn());
+ Integer counter = exchange.getIn().getHeader("copy-counter", Integer.class);
+ if (counter == null) {
+ counter = 0;
+ }
+ exchange.getOut().setHeader("copy-counter", counter + 1);
+ }
+ }
+
+ /**
+ * Simple processor the copies the in to the fault and increments a counter.
+ */
+ private final class InToFault implements Processor {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getFault(true).setBody(exchange.getIn().getBody());
+ Integer counter = exchange.getIn().getHeader("copy-counter", Integer.class);
+ if (counter == null) {
+ counter = 0;
+ }
+ exchange.getFault().setHeader("copy-counter", counter + 1);
+ }
+ }
+
protected MockEndpoint resultEndpoint;
public void testSendMessageThroughAPipeline() throws Exception {
@@ -46,6 +77,39 @@
assertEquals("Result body", 4, results.getOut().getBody());
}
+
+ public void testResultsReturned() throws Exception {
+ Exchange exchange = template.send("direct:b", new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("Hello World");
+ }
+ });
+
+ assertEquals("Hello World", exchange.getOut().getBody());
+ assertEquals(3, exchange.getOut().getHeader("copy-counter"));
+ }
+
+ /**
+ * Disabled for now until we figure out fault processing in the pipeline.
+ *
+ * @throws Exception
+ */
+ public void testFaultStopsPipeline() throws Exception {
+ Exchange exchange = template.send("direct:c", new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("Fault Message");
+ }
+ });
+
+ // Check the fault..
+ assertEquals("Fault Message", exchange.getFault().getBody());
+ assertEquals(2, exchange.getFault().getHeader("copy-counter"));
+
+ // Check the out Message.. It should have only been processed once.
+ // since the fault should stop it from going to the next process.
+ assertEquals(1, exchange.getOut().getHeader("copy-counter"));
+ }
+
@Override
protected void setUp() throws Exception {
super.setUp();
@@ -76,6 +140,11 @@
from("direct:x").process(processor);
from("direct:y").process(processor);
from("direct:z").process(processor);
+
+ // Create a route that uses the InToOut processor 3 times. the copy-counter header should be == 3
+ from("direct:b").process(new InToOut()).process(new InToOut()).process(new InToOut());
+ // Create a route that uses the InToFault processor.. the last InToOut will not be called since the Fault occurs before.
+ from("direct:c").process(new InToOut()).process(new InToFault()).process(new InToOut());
}
};
}