You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/17 14:24:21 UTC

svn commit: r955567 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/impl/converter/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/async/

Author: davsclaus
Date: Thu Jun 17 12:24:21 2010
New Revision: 955567

URL: http://svn.apache.org/viewvc?rev=955567&view=rev
Log:
CAMEL-2723: Error handlers now support async routing engine with task scheduled redelivery tasks.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java
      - copied, changed from r955553, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Thu Jun 17 12:24:21 2010
@@ -53,9 +53,9 @@ public class DeadLetterChannelBuilder ex
     public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
         validateDeadLetterUri(routeContext);
 
-        DeadLetterChannel answer = new DeadLetterChannel(processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(),
-                getHandledPolicy(), getExceptionPolicyStrategy(), getFailureProcessor(), getDeadLetterUri(),
-                isUseOriginalMessage());
+        DeadLetterChannel answer = new DeadLetterChannel(routeContext.getCamelContext(), processor, getLogger(),
+                getOnRedelivery(), getRedeliveryPolicy(), getHandledPolicy(), getExceptionPolicyStrategy(),
+                getFailureProcessor(), getDeadLetterUri(), isUseOriginalMessage());
         // configure error handler before we can use it
         configure(answer);
         return answer;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java Thu Jun 17 12:24:21 2010
@@ -52,8 +52,8 @@ public class DefaultErrorHandlerBuilder 
     }
 
     public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
-        DefaultErrorHandler answer = new DefaultErrorHandler(processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(),
-                getHandledPolicy(), getExceptionPolicyStrategy());
+        DefaultErrorHandler answer = new DefaultErrorHandler(routeContext.getCamelContext(), processor, getLogger(),
+                getOnRedelivery(), getRedeliveryPolicy(), getHandledPolicy(), getExceptionPolicyStrategy());
         // configure error handler before we can use it
         configure(answer);
         return answer;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java Thu Jun 17 12:24:21 2010
@@ -57,6 +57,15 @@ public class AsyncProcessorTypeConverter
             }
             return true;
         }
+
+        @Override
+        public String toString() {
+            if (processor != null) {
+                return processor.toString();
+            } else {
+                return "Processor is null";
+            }
+        }
     }
 
     public <T> T convertTo(Class<T> type, Object value) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Thu Jun 17 12:24:21 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
@@ -34,6 +35,7 @@ public class DeadLetterChannel extends R
     /**
      * Creates the dead letter channel.
      *
+     * @param camelContext              the camel context
      * @param output                    outer processor that should use this dead letter channel
      * @param logger                    logger to use for logging failures and redelivery attempts
      * @param redeliveryProcessor       an optional processor to run before redelivery attempt
@@ -44,10 +46,10 @@ public class DeadLetterChannel extends R
      * @param deadLetterUri             an optional uri for logging purpose
      * @param useOriginalBodyPolicy     should the original IN body be moved to the dead letter queue or the current exchange IN body?
      */
-    public DeadLetterChannel(Processor output, Logger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
+    public DeadLetterChannel(CamelContext camelContext, Processor output, Logger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
                              Predicate handledPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy,
                              Processor deadLetter, String deadLetterUri, boolean useOriginalBodyPolicy) {
-        super(output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, deadLetter, deadLetterUri, useOriginalBodyPolicy);
+        super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, deadLetter, deadLetterUri, useOriginalBodyPolicy);
         setExceptionPolicy(exceptionPolicyStrategy);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java Thu Jun 17 12:24:21 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
@@ -31,6 +32,7 @@ public class DefaultErrorHandler extends
     /**
      * Creates the default error handler.
      *
+     * @param camelContext              the camel context
      * @param output                    outer processor that should use this default error handler
      * @param logger                    logger to use for logging failures and redelivery attempts
      * @param redeliveryProcessor       an optional processor to run before redelivery attempt
@@ -38,9 +40,9 @@ public class DefaultErrorHandler extends
      * @param handledPolicy             policy for handling failed exception that are moved to the dead letter queue
      * @param exceptionPolicyStrategy   strategy for onException handling
      */
-    public DefaultErrorHandler(Processor output, Logger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
-                               Predicate handledPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy) {
-        super(output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, null, null, false);
+    public DefaultErrorHandler(CamelContext camelContext, Processor output, Logger logger, Processor redeliveryProcessor,
+                               RedeliveryPolicy redeliveryPolicy, Predicate handledPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy) {
+        super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, null, null, false);
         setExceptionPolicy(exceptionPolicyStrategy);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=955567&r1=955566&r2=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Thu Jun 17 12:24:21 2010
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Message;
@@ -44,6 +48,8 @@ import org.apache.camel.util.ServiceHelp
  */
 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
 
+    private static ScheduledExecutorService executorService;
+    protected final CamelContext camelContext;
     protected final Processor deadLetter;
     protected final String deadLetterUri;
     protected final Processor output;
@@ -54,6 +60,9 @@ public abstract class RedeliveryErrorHan
     protected final Logger logger;
     protected final boolean useOriginalMessagePolicy;
 
+    /**
+     * Contains the current redelivery data
+     */
     protected class RedeliveryData {
         boolean sync = true;
         int redeliveryCounter;
@@ -70,9 +79,67 @@ public abstract class RedeliveryErrorHan
         boolean useOriginalInMessage = useOriginalMessagePolicy;
     }
 
-    public RedeliveryErrorHandler(Processor output, Logger logger, Processor redeliveryProcessor,
+    /**
+     * Tasks which performs asynchronous redelivery attempts, and being triggered by a
+     * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
+     * has to be delayed before a redelivery attempt is performed. 
+     */
+    private class RedeliveryTask implements Callable<Boolean> {
+
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final RedeliveryData data;
+
+        public RedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.data = data;
+        }
+
+        public Boolean call() throws Exception {
+            // prepare for redelivery
+            prepareExchangeForRedelivery(exchange);
+
+            // letting onRedeliver be executed at first
+            deliverToRedeliveryProcessor(exchange, data);
+
+            if (log.isTraceEnabled()) {
+                log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " -> " + outputAsync + " for Exchange: " + exchange);
+            }
+
+            // process the exchange (also redelivery)
+            boolean sync = outputAsync.process(exchange, new AsyncCallback() {
+                public void done(boolean sync) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done");
+                    }
+
+                    // this callback should only handle the async case
+                    if (sync) {
+                        return;
+                    }
+
+                    // mark we are in async mode now
+                    data.sync = false;
+                    // only process if the exchange hasn't failed
+                    // and it has not been handled by the error processor
+                    if (isDone(exchange)) {
+                        callback.done(sync);
+                        return;
+                    }
+                    // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+                    processAsyncErrorHandler(exchange, callback, data);
+                }
+            });
+
+            return sync;
+        }
+    }
+
+    public RedeliveryErrorHandler(CamelContext camelContext, Processor output, Logger logger, Processor redeliveryProcessor,
                                   RedeliveryPolicy redeliveryPolicy, Predicate handledPolicy, Processor deadLetter,
                                   String deadLetterUri, boolean useOriginalMessagePolicy) {
+        this.camelContext = camelContext;
         this.redeliveryProcessor = redeliveryProcessor;
         this.deadLetter = deadLetter;
         this.output = output;
@@ -101,9 +168,11 @@ public abstract class RedeliveryErrorHan
     }
 
     /**
-     * Processes the exchange decorated with this dead letter channel.
+     * Process the exchange using redelivery error handling.
      */
     protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+
+        // use looping to have redelivery attempts
         while (true) {
 
             // can we still run
@@ -133,7 +202,7 @@ public abstract class RedeliveryErrorHan
                 return sync;
             }
 
-            if (shouldRedeliver && data.redeliveryCounter > 0) {
+            if (data.redeliveryCounter > 0) {
                 // prepare for redelivery
                 prepareExchangeForRedelivery(exchange);
 
@@ -163,32 +232,43 @@ public abstract class RedeliveryErrorHan
 
                     // mark we are in async mode now
                     data.sync = false;
-                    // only process if the exchange hasn't failed
-                    // and it has not been handled by the error processor
+
+                    // if we are done then notify callback and exit
                     if (isDone(exchange)) {
                         callback.done(sync);
                         return;
                     }
+
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+                    // method which takes care of this in a asynchronous manner
                     processAsyncErrorHandler(exchange, callback, data);
                 }
             });
+
             if (!sync) {
                 // the remainder of the Exchange is being processed asynchronously so we should return
                 return false;
             }
+            // we continue to route synchronously
 
-            // we route synchronously
+            // if we are done then notify callback and exit
             boolean done = isDone(exchange);
             if (done) {
                 callback.done(true);
                 return true;
             }
+
             // error occurred so loop back around.....
         }
-
     }
 
+    /**
+     * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
+     * <p/>
+     * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
+     * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
+     * in terms of logic.
+     */
     protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
         // can we still run
         if (!isRunAllowed()) {
@@ -217,48 +297,24 @@ public abstract class RedeliveryErrorHan
             return;
         }
 
-        // TODO: Use a scheduler to schedule the redelivery delay
-        // which contains the outputAsync task being executed
-        // we can optimize and only use the scheduler if there is a delay
-        if (shouldRedeliver && data.redeliveryCounter > 0) {
-            // prepare for redelivery
-            prepareExchangeForRedelivery(exchange);
-
-            // if we are redelivering then sleep before trying again
-            // wait until we should redeliver
-            try {
-                data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter);
-            } catch (InterruptedException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
+        if (data.redeliveryCounter > 0) {
+            // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
+            // have it being executed in the future, or immediately
+            RedeliveryTask task = new RedeliveryTask(exchange, callback, data);
+
+            // calculate the redelivery delay
+            data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
+            if (data.redeliveryDelay > 0) {
+                // schedule the redelivery task
+                if (log.isTraceEnabled()) {
+                    log.trace("Scheduling redelivery task to run in " + data.redeliveryDelay + " millis for exchangeId: " + exchange.getExchangeId());
                 }
-                return;
+                executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
+            } else {
+                // execute the task immediately
+                executorService.submit(task);
             }
-
-            // letting onRedeliver be executed
-            deliverToRedeliveryProcessor(exchange, data);
         }
-
-        // process the exchange (also redelivery)
-        outputAsync.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                // this callback should only handle the async case
-                if (sync) {
-                    return;
-                }
-
-                // mark we are in async mode now
-                data.sync = false;
-                // only process if the exchange hasn't failed
-                // and it has not been handled by the error processor
-                if (isDone(exchange)) {
-                    callback.done(sync);
-                    return;
-                }
-                // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                processAsyncErrorHandler(exchange, callback, data);
-            }
-        });
     }
 
     /**
@@ -277,9 +333,14 @@ public abstract class RedeliveryErrorHan
         // only done if the exchange hasn't failed
         // and it has not been handled by the failure processor
         // or we are exhausted
-        return exchange.getException() == null
-            || ExchangeHelper.isFailureHandled(exchange)
-            || ExchangeHelper.isRedeliveryExhausted(exchange);
+        boolean answer = exchange.getException() == null
+                || ExchangeHelper.isFailureHandled(exchange)
+                || ExchangeHelper.isRedeliveryExhausted(exchange);
+
+        if (log.isTraceEnabled()) {
+            log.trace("Is exchangeId: " + exchange.getExchangeId() + " done? " + answer);
+        }
+        return answer;
     }
 
     /**
@@ -460,7 +521,7 @@ public abstract class RedeliveryErrorHan
                     }
                     try {
                         prepareExchangeAfterFailure(exchange, data);
-                        // fire event as we had a failure processor to handle it
+                        // fire event as we had a failure processor to handle it, which there is a event for
                         boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null;
                         EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel);
                     } finally {
@@ -501,8 +562,8 @@ public abstract class RedeliveryErrorHan
         boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
         if (alreadySet) {
             boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
-            if (log.isDebugEnabled()) {
-                log.debug("This exchange has already been marked for handling: " + handled);
+            if (log.isTraceEnabled()) {
+                log.trace("This exchange has already been marked for handling: " + handled);
             }
             if (handled) {
                 exchange.setException(null);
@@ -516,19 +577,19 @@ public abstract class RedeliveryErrorHan
         }
 
         if (shouldHandled(exchange, data)) {
-            if (log.isDebugEnabled()) {
-                log.debug("This exchange is handled so its marked as not failed: " + exchange);
+            if (log.isTraceEnabled()) {
+                log.trace("This exchange is handled so its marked as not failed: " + exchange);
             }
             exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
         } else if (shouldContinue(exchange, data)) {
-            if (log.isDebugEnabled()) {
-                log.debug("This exchange is continued: " + exchange);
+            if (log.isTraceEnabled()) {
+                log.trace("This exchange is continued: " + exchange);
             }
             // okay we want to continue then prepare the exchange for that as well
             prepareExchangeForContinue(exchange, data);
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("This exchange is not handled or continued so its marked as failed: " + exchange);
+            if (log.isTraceEnabled()) {
+                log.trace("This exchange is not handled or continued so its marked as failed: " + exchange);
             }
             // exception not handled, put exception back in the exchange
             exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
@@ -681,6 +742,14 @@ public abstract class RedeliveryErrorHan
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startServices(output, outputAsync, deadLetter);
+        // use pool size from default profile
+        int poolSize = camelContext.getExecutorServiceStrategy().getDefaultThreadPoolProfile().getPoolSize();
+        // use a shared scheduler 
+        if (executorService == null || executorService.isShutdown()) {
+            // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping
+            executorService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this,
+                    getClass().getSimpleName() + "-AsyncRedeliveryTask", poolSize);
+        }
     }
 
     @Override

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java (from r955553, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryTest.java&r1=955553&r2=955567&rev=955567&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncFailureProcessorWithRedeliveryAndDelayTest.java Thu Jun 17 12:24:21 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncFailureProcessorWithRedeliveryTest extends ContextTestSupport {
+public class AsyncFailureProcessorWithRedeliveryAndDelayTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
@@ -50,7 +50,7 @@ public class AsyncFailureProcessorWithRe
                 context.addComponent("async", new MyAsyncComponent());
 
                 // use redelivery up till 5 times
-                errorHandler(defaultErrorHandler().maximumRedeliveries(5));
+                errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliveryDelay(1000));
 
                 from("direct:start")
                         .to("mock:before")