You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2007/09/04 21:32:18 UTC

svn commit: r572785 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/Pipeline.java test/java/org/apache/camel/processor/PipelineTest.java

Author: chirino
Date: Tue Sep  4 12:32:17 2007
New Revision: 572785

URL: http://svn.apache.org/viewvc?rev=572785&view=rev
Log:
Added more tests and fixed up the async case of pipeline.

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=572785&r1=572784&r2=572785&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Tue Sep  4 12:32:17 2007
@@ -102,27 +102,40 @@
         exchange.throwException();
     }
 
-    public boolean process(Exchange exchange, AsyncCallback callback) {
+    public boolean process(Exchange original, AsyncCallback callback) {
         Iterator<Processor> processors = getProcessors().iterator();
-        Exchange nextExchange = exchange;
+        Exchange nextExchange = original;
+        boolean first = true;
         while (processors.hasNext()) {
             AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
-            boolean sync = process(nextExchange, callback, processors, processor);
-            // Continue processing the pipeline synchronously ...
-            if (sync) {
-                nextExchange = createNextExchange(processor, exchange);
+            
+            if (nextExchange.isFailed()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " + nextExchange.getFault(false));
+                }
+                break;
+            }
+            if (first) {
+                first = false;
             } else {
+                nextExchange = createNextExchange(processor, original);
+            }
+            boolean sync = process(original, nextExchange, callback, processors, processor);
+            // Continue processing the pipeline synchronously ...
+            if (!sync) {
                 // The pipeline will be completed async...
-                return true;
+                return false;
             }
         }
+        
         // If we get here then the pipeline was processed entirely
         // synchronously.
+        ExchangeHelper.copyResults(original, nextExchange);
         callback.done(true);
         return true;
     }
 
-    private boolean process(final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
+    private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
         return processor.process(exchange, new AsyncCallback() {
             public void done(boolean sync) {
 
@@ -136,12 +149,22 @@
                 Exchange nextExchange = exchange;
                 while( processors.hasNext() ) {
                     AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+                    
+                    if (nextExchange.isFailed()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " + nextExchange.getFault(false));
+                        }
+                        break;
+                    }
+
                     nextExchange = createNextExchange(processor, exchange);
-                    sync = process( nextExchange, callback, processors, processor);
+                    sync = process( original, nextExchange, callback, processors, processor);
                     if( !sync ) {
                         return;
                     }
                 }
+                
+                ExchangeHelper.copyResults(original, nextExchange);
                 callback.done(true);
             }
         });

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java?rev=572785&r1=572784&r2=572785&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java Tue Sep  4 12:32:17 2007
@@ -27,6 +27,37 @@
  * @version $Revision: 1.1 $
  */
 public class PipelineTest extends ContextTestSupport {
+    
+    /**
+     * Simple processor the copies the in to the out and increments a counter.
+     * Used to verify that the pipeline actually takes the output of one stage of 
+     * the pipe and feeds it in as input into the next stage.
+     */
+    private final class InToOut implements Processor {
+        public void process(Exchange exchange) throws Exception {            
+            exchange.getOut(true).copyFrom(exchange.getIn());
+            Integer counter = exchange.getIn().getHeader("copy-counter", Integer.class);
+            if (counter == null) {
+                counter = 0;
+            }
+            exchange.getOut().setHeader("copy-counter", counter + 1);
+        }
+    }
+
+    /**
+     * Simple processor the copies the in to the fault and increments a counter.
+     */
+    private final class InToFault implements Processor {
+        public void process(Exchange exchange) throws Exception {
+            exchange.getFault(true).setBody(exchange.getIn().getBody());
+            Integer counter = exchange.getIn().getHeader("copy-counter", Integer.class);
+            if (counter == null) {
+                counter = 0;
+            }
+            exchange.getFault().setHeader("copy-counter", counter + 1);
+        }
+    }
+
     protected MockEndpoint resultEndpoint;
 
     public void testSendMessageThroughAPipeline() throws Exception {
@@ -46,6 +77,39 @@
         assertEquals("Result body", 4, results.getOut().getBody());
     }
 
+    
+    public void testResultsReturned() throws Exception {
+        Exchange exchange = template.send("direct:b", new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("Hello World");
+            }
+        });
+        
+        assertEquals("Hello World", exchange.getOut().getBody());
+        assertEquals(3, exchange.getOut().getHeader("copy-counter"));        
+    }
+
+    /**
+     * Disabled for now until we figure out fault processing in the pipeline.
+     * 
+     * @throws Exception
+     */
+    public void testFaultStopsPipeline() throws Exception {
+        Exchange exchange = template.send("direct:c", new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("Fault Message");
+            }
+        });
+        
+        // Check the fault..
+        assertEquals("Fault Message", exchange.getFault().getBody());
+        assertEquals(2, exchange.getFault().getHeader("copy-counter"));        
+        
+        // Check the out Message.. It should have only been processed once.
+        // since the fault should stop it from going to the next process.
+        assertEquals(1, exchange.getOut().getHeader("copy-counter"));                
+    }
+
     @Override
     protected void setUp() throws Exception {
         super.setUp();
@@ -76,6 +140,11 @@
                 from("direct:x").process(processor);
                 from("direct:y").process(processor);
                 from("direct:z").process(processor);
+                
+                // Create a route that uses the  InToOut processor 3 times. the copy-counter header should be == 3
+                from("direct:b").process(new InToOut()).process(new InToOut()).process(new InToOut());
+                // Create a route that uses the  InToFault processor.. the last InToOut will not be called since the Fault occurs before.
+                from("direct:c").process(new InToOut()).process(new InToFault()).process(new InToOut());
             }
         };
     }