You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/16 13:33:07 UTC

svn commit: r1350909 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/te...

Author: davsclaus
Date: Sat Jun 16 11:33:06 2012
New Revision: 1350909

URL: http://svn.apache.org/viewvc?rev=1350909&view=rev
Log:
CAMEL-5372: Fixed issue with error handler thread pool for redelivery should be per camel context.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/RedeliveryErrorHandlerAsyncDelayedTwoCamelContextIssueTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sat Jun 16 11:33:06 2012
@@ -20,6 +20,7 @@ import java.io.InputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.builder.ErrorHandlerBuilder;
@@ -767,6 +768,12 @@ public interface CamelContext extends Su
     void setErrorHandlerBuilder(ErrorHandlerFactory errorHandlerBuilder);
 
     /**
+     * Gets the default shared thread pool for error handlers which
+     * leverages this for asynchronous redelivery tasks.
+     */
+    ScheduledExecutorService getErrorHandlerExecutorService();
+
+    /**
      * Sets the data formats that can be referenced in the routes.
      *
      * @param dataFormats the data formats

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Sat Jun 16 11:33:06 2012
@@ -55,7 +55,7 @@ public class DeadLetterChannelBuilder ex
 
         DeadLetterChannel answer = new DeadLetterChannel(routeContext.getCamelContext(), processor, getLogger(), getOnRedelivery(), 
                 getRedeliveryPolicy(), getExceptionPolicyStrategy(), getFailureProcessor(), getDeadLetterUri(), isUseOriginalMessage(),
-                getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorServiceRef());
+                getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorService(routeContext.getCamelContext()));
         // configure error handler before we can use it
         configure(routeContext, answer);
         return answer;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java Sat Jun 16 11:33:06 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.builder;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Expression;
@@ -24,8 +26,10 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.DefaultErrorHandler;
 import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.Language;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExpressionToPredicateAdapter;
 import org.slf4j.LoggerFactory;
@@ -48,13 +52,14 @@ public class DefaultErrorHandlerBuilder 
     protected boolean useOriginalMessage;
     protected boolean asyncDelayedRedelivery;
     protected String executorServiceRef;
+    protected ScheduledExecutorService executorService;
 
     public DefaultErrorHandlerBuilder() {
     }
 
     public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
         DefaultErrorHandler answer = new DefaultErrorHandler(routeContext.getCamelContext(), processor, getLogger(), getOnRedelivery(), 
-            getRedeliveryPolicy(), getExceptionPolicyStrategy(), getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorServiceRef());
+            getRedeliveryPolicy(), getExceptionPolicyStrategy(), getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorService(routeContext.getCamelContext()));
         // configure error handler before we can use it
         configure(routeContext, answer);
         return answer;
@@ -64,7 +69,6 @@ public class DefaultErrorHandlerBuilder 
         return false;
     }
 
-
     // Builder methods
     // -------------------------------------------------------------------------
     public DefaultErrorHandlerBuilder backOffMultiplier(double backOffMultiplier) {
@@ -401,6 +405,27 @@ public class DefaultErrorHandlerBuilder 
         return new CamelLogger(LoggerFactory.getLogger(DefaultErrorHandler.class), LoggingLevel.ERROR);
     }
 
+    protected synchronized ScheduledExecutorService getExecutorService(CamelContext camelContext) {
+        if (executorService == null || executorService.isShutdown()) {
+            // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping
+            if (executorServiceRef != null) {
+                executorService = camelContext.getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class);
+                if (executorService == null) {
+                    ExecutorServiceManager manager = camelContext.getExecutorServiceManager();
+                    ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef);
+                    executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile);
+                }
+                if (executorService == null) {
+                    throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
+                }
+            } else {
+                // use default shared thread pool for error handlers
+                executorService = camelContext.getErrorHandlerExecutorService();
+            }
+        }
+        return executorService;
+    }
+
     @Override
     public String toString() {
         return "DefaultErrorHandlerBuilder";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sat Jun 16 11:33:06 2012
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -112,6 +113,7 @@ import org.apache.camel.spi.RouteContext
 import org.apache.camel.spi.RouteStartupOrder;
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.spi.ShutdownStrategy;
+import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.spi.TypeConverterRegistry;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.support.ServiceSupport;
@@ -176,6 +178,7 @@ public class DefaultCamelContext extends
     private Boolean useBreadcrumb = Boolean.TRUE;
     private Long delay;
     private ErrorHandlerFactory errorHandlerBuilder;
+    private ScheduledExecutorService errorHandlerExecutorService;
     private Map<String, DataFormatDefinition> dataFormats = new HashMap<String, DataFormatDefinition>();
     private DataFormatResolver dataFormatResolver = new DefaultDataFormatResolver();
     private Map<String, String> properties = new HashMap<String, String>();
@@ -1268,6 +1271,10 @@ public class DefaultCamelContext extends
         this.errorHandlerBuilder = errorHandlerBuilder;
     }
 
+    public ScheduledExecutorService getErrorHandlerExecutorService() {
+        return errorHandlerExecutorService;
+    }
+
     public void setProducerServicePool(ServicePool<Endpoint, Producer> producerServicePool) {
         this.producerServicePool = producerServicePool;
     }
@@ -1527,6 +1534,11 @@ public class DefaultCamelContext extends
 
         startServices(components.values());
 
+        // setup default thread pool for error handler
+        if (errorHandlerExecutorService == null || errorHandlerExecutorService.isShutdown()) {
+            errorHandlerExecutorService = getExecutorServiceManager().newDefaultScheduledThreadPool(this, "ErrorHandlerRedeliveryTask");
+        }
+
         // start the route definitions before the routes is started
         startRouteDefinitions(routeDefinitions);
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Sat Jun 16 11:33:06 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
@@ -48,12 +50,13 @@ public class DeadLetterChannel extends R
      * @param deadLetterUri             an optional uri for logging purpose
      * @param useOriginalBodyPolicy     should the original IN body be moved to the dead letter queue or the current exchange IN body?
      * @param retryWhile                retry while
-     * @param executorServiceRef        reference to a {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>.
+     * @param executorService           the {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>.
      */
     public DeadLetterChannel(CamelContext camelContext, Processor output, CamelLogger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy,
-            ExceptionPolicyStrategy exceptionPolicyStrategy, Processor deadLetter, String deadLetterUri, boolean useOriginalBodyPolicy, Predicate retryWhile, String executorServiceRef) {
+            ExceptionPolicyStrategy exceptionPolicyStrategy, Processor deadLetter, String deadLetterUri, boolean useOriginalBodyPolicy, Predicate retryWhile,
+            ScheduledExecutorService executorService) {
 
-        super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, deadLetter, deadLetterUri, useOriginalBodyPolicy, retryWhile, executorServiceRef);
+        super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, deadLetter, deadLetterUri, useOriginalBodyPolicy, retryWhile, executorService);
         setExceptionPolicy(exceptionPolicyStrategy);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java Sat Jun 16 11:33:06 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
@@ -40,12 +42,12 @@ public class DefaultErrorHandler extends
      * @param redeliveryPolicy          policy for redelivery
      * @param exceptionPolicyStrategy   strategy for onException handling
      * @param retryWhile                retry while
-     * @param executorServiceRef        reference to a {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>.
+     * @param executorService           the {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>.
      */
     public DefaultErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, Processor redeliveryProcessor,
-            RedeliveryPolicy redeliveryPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy, Predicate retryWhile, String executorServiceRef) {
+            RedeliveryPolicy redeliveryPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy, Predicate retryWhile, ScheduledExecutorService executorService) {
 
-        super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, false, retryWhile, executorServiceRef);
+        super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, false, retryWhile, executorService);
         setExceptionPolicy(exceptionPolicyStrategy);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Sat Jun 16 11:33:06 2012
@@ -30,9 +30,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.model.OnExceptionDefinition;
-import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.SubUnitOfWorkCallback;
-import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -55,8 +53,7 @@ import org.apache.camel.util.ServiceHelp
  */
 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
 
-    private static ScheduledExecutorService executorService;
-    protected final String executorServiceRef;
+    protected ScheduledExecutorService executorService;
     protected final CamelContext camelContext;
     protected final Processor deadLetter;
     protected final String deadLetterUri;
@@ -179,7 +176,7 @@ public abstract class RedeliveryErrorHan
 
     public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
             Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
-            String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, String executorServiceRef) {
+            String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, ScheduledExecutorService executorService) {
 
         ObjectHelper.notNull(camelContext, "CamelContext", this);
         ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
@@ -194,7 +191,7 @@ public abstract class RedeliveryErrorHan
         this.deadLetterUri = deadLetterUri;
         this.useOriginalMessagePolicy = useOriginalMessagePolicy;
         this.retryWhilePolicy = retryWhile;
-        this.executorServiceRef = executorServiceRef;
+        this.executorService = executorService;
     }
 
     public boolean supportTransacted() {
@@ -601,7 +598,7 @@ public abstract class RedeliveryErrorHan
         // clear rollback flags
         exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
 
-        // TODO: We may want to store these as state on RedelieryData so we keep them in case end user messes with Exchange
+        // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
         // and then put these on the exchange when doing a redelivery / fault processor
 
         // preserve these headers
@@ -1050,23 +1047,10 @@ public abstract class RedeliveryErrorHan
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startServices(output, outputAsync, deadLetter);
-        // use a shared scheduler
-        if (executorService == null || executorService.isShutdown()) {
-            // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping
-            if (executorServiceRef != null) {
-                executorService = camelContext.getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class);
-                if (executorService == null) {
-                    ExecutorServiceManager manager = camelContext.getExecutorServiceManager();
-                    ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef);
-                    executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile);
-                }
-                if (executorService == null) {
-                    throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry.");
-                }
-            } else {
-                // create a default scheduled thread pool
-                executorService = camelContext.getExecutorServiceManager().newDefaultScheduledThreadPool(this, "ErrorHandlerRedeliveryTask");
-            }
+
+        if (executorService == null) {
+            // use default shared executor service
+            executorService = camelContext.getErrorHandlerExecutorService();
         }
 
         // determine if redeliver is enabled or not

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java Sat Jun 16 11:33:06 2012
@@ -50,8 +50,9 @@ public class MultipleLifecycleStrategyTe
         context.removeComponent("log");
         context.stop();
 
-        List<String> expectedEvents = Arrays.asList("onThreadPoolAdd", "onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
-             "onServiceAdd", "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
+        List<String> expectedEvents = Arrays.asList("onThreadPoolAdd", "onContextStart", "onServiceAdd", "onServiceAdd",
+                "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
+                "onThreadPoolAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
         
         assertEquals(expectedEvents, dummy1.getEvents());
         assertEquals(expectedEvents, dummy2.getEvents());

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/RedeliveryErrorHandlerAsyncDelayedTwoCamelContextIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/RedeliveryErrorHandlerAsyncDelayedTwoCamelContextIssueTest.java?rev=1350909&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/RedeliveryErrorHandlerAsyncDelayedTwoCamelContextIssueTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/RedeliveryErrorHandlerAsyncDelayedTwoCamelContextIssueTest.java Sat Jun 16 11:33:06 2012
@@ -0,0 +1,71 @@
+package org.apache.camel.issues;
+
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RedeliveryErrorHandlerAsyncDelayedTwoCamelContextIssueTest {
+
+    @Test
+    public void shouldNotBreakRedeliveriesOfSecondContextAfterFirstBeingStopped() throws Exception {
+        DefaultCamelContext context1 = createContext();
+        ProducerTemplate producer1 = context1.createProducerTemplate();
+        ConsumerTemplate consumer1 = context1.createConsumerTemplate();
+        context1.start();
+        producer1.sendBody("seda://input", "Hey1");
+        Exchange ex1 = consumer1.receive("seda://output", 5000);
+
+        DefaultCamelContext context2 = createContext();
+        ProducerTemplate producer2 = context2.createProducerTemplate();
+        ConsumerTemplate consumer2 = context2.createConsumerTemplate();
+        context2.start();
+
+        // now stop 1, and see that 2 is still working
+        context1.stop();
+
+        producer2.sendBody("seda://input", "Hey2");
+        Exchange ex2 = consumer2.receive("seda://output", 5000);
+
+        Assert.assertNotNull(ex1);
+        Assert.assertEquals("Hey1", ex1.getIn().getBody());
+        Assert.assertNotNull(ex2);
+        Assert.assertEquals("Hey2", ex2.getIn().getBody());
+
+        context2.stop();
+    }
+
+    private DefaultCamelContext createContext() throws Exception {
+        DefaultCamelContext context = new DefaultCamelContext();
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                    .redeliveryDelay(100)
+                    .maximumRedeliveries(5)
+                    .maximumRedeliveryDelay(1000)
+                    .backOffMultiplier(1)
+                    .asyncDelayedRedelivery();
+
+                from("seda://input")
+                    .bean(ProblematicBean.class)
+                    .to("seda://output");
+            }
+        });
+        return context;
+    }
+
+    public static class ProblematicBean {
+        int counter = 0;
+
+        public void doSomething() {
+            if (counter++ < 2) {
+                throw new RuntimeException();
+            }
+        }
+    }
+
+}

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java Sat Jun 16 11:33:06 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spring.spi;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -56,15 +58,15 @@ public class TransactionErrorHandler ext
      * @param exceptionPolicyStrategy strategy for onException handling
      * @param transactionTemplate     the transaction template
      * @param retryWhile              retry while
-     * @param executorServiceRef      reference to a {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>.
+     * @param executorService         the {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>.
      * @param rollbackLoggingLevel    logging level to use for logging transaction rollback occurred
      */
     public TransactionErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, 
             Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy,
-            TransactionTemplate transactionTemplate, Predicate retryWhile, String executorServiceRef,
+            TransactionTemplate transactionTemplate, Predicate retryWhile, ScheduledExecutorService executorService,
             LoggingLevel rollbackLoggingLevel) {
 
-        super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, false, retryWhile, executorServiceRef);
+        super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, false, retryWhile, executorService);
         setExceptionPolicy(exceptionPolicyStrategy);
         this.transactionTemplate = transactionTemplate;
         this.rollbackLoggingLevel = rollbackLoggingLevel;

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java?rev=1350909&r1=1350908&r2=1350909&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java Sat Jun 16 11:33:06 2012
@@ -109,7 +109,7 @@ public class TransactionErrorHandlerBuil
 
         TransactionErrorHandler answer = new TransactionErrorHandler(routeContext.getCamelContext(), processor,
             getLogger(), getOnRedelivery(), getRedeliveryPolicy(), getExceptionPolicyStrategy(), transactionTemplate, 
-            getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorServiceRef(), getRollbackLoggingLevel());
+            getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorService(routeContext.getCamelContext()), getRollbackLoggingLevel());
         // configure error handler before we can use it
         configure(routeContext, answer);
         return answer;