You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/11/22 16:33:25 UTC

[camel] 08/11: Fix async engine / tests

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6094865d0d60b0dd0f14db6e6a9e24b7a6684cf7
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Wed Nov 21 11:40:35 2018 +0100

    Fix async engine / tests
---
 .../apache/camel/impl/AbstractCamelContext.java    | 87 ++++++++++++----------
 .../org/apache/camel/processor/LoopProcessor.java  |  6 +-
 .../apache/camel/processor/MulticastProcessor.java |  6 +-
 .../java/org/apache/camel/processor/Pipeline.java  |  9 ++-
 .../camel/processor/RedeliveryErrorHandler.java    |  6 +-
 .../apache/camel/processor/WireTapProcessor.java   | 21 +++---
 .../org/apache/camel/support/ReactiveHelper.java   | 22 ++++--
 .../component/beanstalk/BeanstalkProducer.java     |  5 +-
 .../disruptor/DisruptorConcurrentTest.java         |  4 +-
 .../docker/producer/AsyncDockerProducer.java       |  5 +-
 .../file/remote/FromFtpAsyncProcessTest.java       |  8 +-
 .../FtpShutdownCompleteCurrentTaskOnlyTest.java    |  2 +-
 .../apache/camel/component/grpc/GrpcProducer.java  |  7 +-
 .../ignite/cache/IgniteCacheProducer.java          |  3 +-
 .../ignite/compute/IgniteComputeProducer.java      |  9 ++-
 .../ignite/idgen/IgniteIdGenProducer.java          |  5 +-
 .../ignite/messaging/IgniteMessagingProducer.java  |  4 +-
 .../ignite/queue/IgniteQueueProducer.java          |  5 +-
 .../component/ignite/set/IgniteSetProducer.java    |  5 +-
 .../component/netty4/NettyRedeliveryTest.java      |  5 +-
 20 files changed, 126 insertions(+), 98 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java
index 00aa494..146edf1 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java
@@ -234,8 +234,6 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
     private Boolean allowUseOriginalMessage = Boolean.FALSE;
     private Long delay;
     private ErrorHandlerFactory errorHandlerBuilder;
-    private final Object errorHandlerExecutorServiceLock = new Object();
-    private ScheduledExecutorService errorHandlerExecutorService;
     private Map<String, DataFormatDefinition> dataFormats = new HashMap<>();
     private Map<String, String> globalOptions = new HashMap<>();
     private PropertiesComponent propertiesComponent;
@@ -243,10 +241,10 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
     private final Map<String, RouteService> routeServices = new LinkedHashMap<>();
     private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<>();
 
+    private final Object lock = new Object();
     private volatile CamelContextNameStrategy nameStrategy;
     private volatile ManagementNameStrategy managementNameStrategy;
-    private Registry registry;
-
+    private volatile Registry registry;
     private volatile TypeConverter typeConverter;
     private volatile TypeConverterRegistry typeConverterRegistry;
     private volatile Injector injector;
@@ -276,6 +274,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
     private volatile UnitOfWorkFactory unitOfWorkFactory;
     private volatile ReloadStrategy reloadStrategy;
     private volatile RouteController routeController;
+    private volatile ScheduledExecutorService errorHandlerExecutorService;
 
     private TransformerRegistry<TransformerKey> transformerRegistry;
     private ValidatorRegistry<ValidatorKey> validatorRegistry;
@@ -413,7 +412,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public CamelContextNameStrategy getNameStrategy() {
         if (nameStrategy == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (nameStrategy == null) {
                     setNameStrategy(createCamelContextNameStrategy());
                 }
@@ -428,7 +427,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ManagementNameStrategy getManagementNameStrategy() {
         if (managementNameStrategy == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (managementNameStrategy == null) {
                     setManagementNameStrategy(createManagementNameStrategy());
                 }
@@ -913,7 +912,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
     @Override
     public RouteController getRouteController() {
         if (routeController == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (routeController == null) {
                     setRouteController(createRouteController());
                 }
@@ -2456,7 +2455,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public TypeConverter getTypeConverter() {
         if (typeConverter == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (typeConverter == null) {
                     setTypeConverter(createTypeConverter());
                 }
@@ -2471,7 +2470,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public TypeConverterRegistry getTypeConverterRegistry() {
         if (typeConverterRegistry == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (typeConverterRegistry == null) {
                     setTypeConverterRegistry(createTypeConverterRegistry());
                 }
@@ -2486,7 +2485,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public Injector getInjector() {
         if (injector == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (injector == null) {
                     setInjector(createInjector());
                 }
@@ -2509,7 +2508,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ComponentResolver getComponentResolver() {
         if (componentResolver == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (componentResolver == null) {
                     setComponentResolver(createComponentResolver());
                 }
@@ -2524,7 +2523,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public LanguageResolver getLanguageResolver() {
         if (languageResolver == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (languageResolver == null) {
                     setLanguageResolver(createLanguageResolver());
                 }
@@ -2547,7 +2546,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public Registry getRegistry() {
         if (registry == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (registry == null) {
                     setRegistry(createRegistry());
                 }
@@ -2875,22 +2874,32 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
     }
 
     public ScheduledExecutorService getErrorHandlerExecutorService() {
-        synchronized (errorHandlerExecutorServiceLock) {
-            if (errorHandlerExecutorService == null) {
-                // setup default thread pool for error handler
-                errorHandlerExecutorService = getExecutorServiceManager().newDefaultScheduledThreadPool("ErrorHandlerRedeliveryThreadPool", "ErrorHandlerRedeliveryTask");
+        if (errorHandlerExecutorService == null) {
+            synchronized (lock) {
+                if (errorHandlerExecutorService == null) {
+                    // setup default thread pool for error handler
+                    errorHandlerExecutorService = createErrorHandlerExecutorService();
+                }
             }
         }
         return errorHandlerExecutorService;
     }
 
+    protected ScheduledExecutorService createErrorHandlerExecutorService() {
+        return getExecutorServiceManager().newDefaultScheduledThreadPool("ErrorHandlerRedeliveryThreadPool", "ErrorHandlerRedeliveryTask");
+    }
+
+    public void setErrorHandlerExecutorService(ScheduledExecutorService errorHandlerExecutorService) {
+        this.errorHandlerExecutorService = errorHandlerExecutorService;
+    }
+
     public void setProducerServicePool(ServicePool<Producer> producerServicePool) {
         this.producerServicePool = doAddService(producerServicePool);
     }
 
     public ServicePool<Producer> getProducerServicePool() {
         if (producerServicePool == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (producerServicePool == null) {
                     setProducerServicePool(createProducerServicePool());
                 }
@@ -2901,7 +2910,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ServicePool<PollingConsumer> getPollingConsumerServicePool() {
         if (pollingConsumerServicePool == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (pollingConsumerServicePool == null) {
                     setPollingConsumerServicePool(createPollingConsumerServicePool());
                 }
@@ -2916,7 +2925,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public UnitOfWorkFactory getUnitOfWorkFactory() {
         if (unitOfWorkFactory == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (unitOfWorkFactory == null) {
                     setUnitOfWorkFactory(createUnitOfWorkFactory());
                 }
@@ -2954,7 +2963,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public String getVersion() {
         if (version == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (version == null) {
                     version = doGetVersion();
                 }
@@ -4121,7 +4130,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public FactoryFinder getDefaultFactoryFinder() {
         if (defaultFactoryFinder == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (defaultFactoryFinder == null) {
                     defaultFactoryFinder = getFactoryFinderResolver().resolveDefaultFactoryFinder(getClassResolver());
                 }
@@ -4132,7 +4141,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public FactoryFinderResolver getFactoryFinderResolver() {
         if (factoryFinderResolver == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (factoryFinderResolver == null) {
                     factoryFinderResolver = createFactoryFinderResolver();
                 }
@@ -4155,7 +4164,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ClassResolver getClassResolver() {
         if (classResolver == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (classResolver == null) {
                     setClassResolver(createClassResolver());
                 }
@@ -4170,7 +4179,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public PackageScanClassResolver getPackageScanClassResolver() {
         if (packageScanClassResolver == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (packageScanClassResolver == null) {
                     setPackageScanClassResolver(createPackageScanClassResolver());
                 }
@@ -4193,7 +4202,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ModelJAXBContextFactory getModelJAXBContextFactory() {
         if (modelJAXBContextFactory == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (modelJAXBContextFactory == null) {
                     setModelJAXBContextFactory(createModelJAXBContextFactory());
                 }
@@ -4208,7 +4217,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public NodeIdFactory getNodeIdFactory() {
         if (nodeIdFactory == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (nodeIdFactory == null) {
                     setNodeIdFactory(createNodeIdFactory());
                 }
@@ -4223,7 +4232,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ManagementStrategy getManagementStrategy() {
         if (managementStrategy == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (managementStrategy == null) {
                     setManagementStrategy(createManagementStrategy());
                 }
@@ -4254,7 +4263,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public InflightRepository getInflightRepository() {
         if (inflightRepository == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (inflightRepository == null) {
                     setInflightRepository(createInflightRepository());
                 }
@@ -4269,7 +4278,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
         if (asyncProcessorAwaitManager == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (asyncProcessorAwaitManager == null) {
                     setAsyncProcessorAwaitManager(createAsyncProcessorAwaitManager());
                 }
@@ -4341,7 +4350,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public DataFormatResolver getDataFormatResolver() {
         if (dataFormatResolver == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (dataFormatResolver == null) {
                     setDataFormatResolver(createDataFormatResolver());
                 }
@@ -4396,7 +4405,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ShutdownStrategy getShutdownStrategy() {
         if (shutdownStrategy == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (shutdownStrategy == null) {
                     setShutdownStrategy(createShutdownStrategy());
                 }
@@ -4435,7 +4444,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ExecutorServiceManager getExecutorServiceManager() {
         if (executorServiceManager == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (executorServiceManager == null) {
                     setExecutorServiceManager(createExecutorServiceManager());
                 }
@@ -4451,7 +4460,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public ProcessorFactory getProcessorFactory() {
         if (processorFactory == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (processorFactory == null) {
                     setProcessorFactory(createProcessorFactory());
                 }
@@ -4466,7 +4475,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public MessageHistoryFactory getMessageHistoryFactory() {
         if (messageHistoryFactory == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (messageHistoryFactory == null) {
                     setMessageHistoryFactory(createMessageHistoryFactory());
                 }
@@ -4492,7 +4501,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public UuidGenerator getUuidGenerator() {
         if (uuidGenerator == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (uuidGenerator == null) {
                     setUuidGenerator(createUuidGenerator());
                 }
@@ -4507,7 +4516,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public StreamCachingStrategy getStreamCachingStrategy() {
         if (streamCachingStrategy == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (streamCachingStrategy == null) {
                     setStreamCachingStrategy(createStreamCachingStrategy());
                 }
@@ -4522,7 +4531,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
 
     public RestRegistry getRestRegistry() {
         if (restRegistry ==  null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (restRegistry == null) {
                     setRestRegistry(createRestRegistry());
                 }
@@ -4616,7 +4625,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
     @Override
     public HeadersMapFactory getHeadersMapFactory() {
         if (headersMapFactory == null) {
-            synchronized (this) {
+            synchronized (lock) {
                 if (headersMapFactory == null) {
                     setHeadersMapFactory(createHeadersMapFactory());
                 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 92614a0..b6aecbc 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -52,7 +52,11 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
 
             LoopState state = new LoopState(exchange, callback);
 
-            ReactiveHelper.scheduleMain(state);
+            if (exchange.isTransacted()) {
+                ReactiveHelper.scheduleSync(state);
+            } else {
+                ReactiveHelper.scheduleMain(state);
+            }
             return false;
 
         } catch (Exception e) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index df08df0..ccb6e23 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -223,7 +223,11 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
         if (isParallelProcessing()) {
             executorService.submit(() -> ReactiveHelper.schedule(state));
         } else {
-            ReactiveHelper.scheduleMain(state);
+            if (exchange.isTransacted()) {
+                ReactiveHelper.scheduleSync(state);
+            } else {
+                ReactiveHelper.scheduleMain(state);
+            }
         }
 
         // the remainder of the multicast will be completed async
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
index 0d7d4e5..90bee9c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -81,8 +81,13 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
-                "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+        if (exchange.isTransacted()) {
+            ReactiveHelper.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+                    "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+        } else {
+            ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+                    "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+        }
         return false;
     }
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 1a87121..1577ad1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -152,7 +152,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         // Create the redelivery state object for this exchange
         RedeliveryState state = new RedeliveryState(exchange, callback);
         // Run it
-        ReactiveHelper.scheduleMain(state);
+        if (exchange.isTransacted()) {
+            ReactiveHelper.scheduleSync(state);
+        } else {
+            ReactiveHelper.scheduleMain(state);
+        }
         return false;
     }
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 2282467..1c47414 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -38,6 +38,7 @@ import org.apache.camel.Traceable;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.ExchangeHelper;
@@ -143,20 +144,16 @@ public class WireTapProcessor extends AsyncProcessorSupport implements Traceable
         final Exchange wireTapExchange = target;
 
         // send the exchange to the destination using an executor service
-        executorService.submit(new Callable<Exchange>() {
-            public Exchange call() throws Exception {
+        executorService.submit(() -> {
                 taskCount.increment();
-                try {
-                    log.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
-                    processor.process(wireTapExchange);
-                } catch (Throwable e) {
-                    log.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e);
-                } finally {
+                log.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
+                AsyncProcessorConverterHelper.convert(processor).process(wireTapExchange, doneSync -> {
+                    if (wireTapExchange.getException() != null) {
+                        log.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", wireTapExchange.getException());
+                    }
                     taskCount.decrement();
-                }
-                return wireTapExchange;
-            }
-        });
+                });
+            });
 
         // continue routing this synchronously
         callback.done(true);
diff --git a/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java b/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
index f5a66e6..3676d60 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -33,23 +33,31 @@ public class ReactiveHelper {
     }
 
     public static void scheduleMain(Runnable runnable) {
-        WORKERS.get().schedule(runnable, true, true);
+        WORKERS.get().schedule(runnable, true, true, false);
+    }
+
+    public static void scheduleSync(Runnable runnable) {
+        WORKERS.get().schedule(runnable, true, true, true);
     }
 
     public static void scheduleMain(Runnable runnable, String description) {
-        WORKERS.get().schedule(describe(runnable, description), true, true);
+        WORKERS.get().schedule(describe(runnable, description), true, true, false);
     }
 
     public static void schedule(Runnable runnable) {
-        WORKERS.get().schedule(runnable, true, false);
+        WORKERS.get().schedule(runnable, true, false, false);
     }
 
     public static void schedule(Runnable runnable, String description) {
-        WORKERS.get().schedule(describe(runnable, description), true, false);
+        WORKERS.get().schedule(describe(runnable, description), true, false, false);
     }
 
     public static void scheduleLast(Runnable runnable, String description) {
-        WORKERS.get().schedule(describe(runnable, description), false, false);
+        WORKERS.get().schedule(describe(runnable, description), false, false, false);
+    }
+
+    public static void scheduleSync(Runnable runnable, String description) {
+        WORKERS.get().schedule(describe(runnable, description), false, true, true);
     }
 
     public static boolean executeFromQueue() {
@@ -88,7 +96,7 @@ public class ReactiveHelper {
         LinkedList<LinkedList<Runnable>> back;
         boolean running;
 
-        public void schedule(Runnable runnable, boolean first, boolean main) {
+        public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
             if (main) {
                 if (!queue.isEmpty()) {
                     if (back == null) {
@@ -103,7 +111,7 @@ public class ReactiveHelper {
             } else {
                 queue.addLast(runnable);
             }
-            if (!running) {
+            if (!running || sync) {
                 running = true;
 //                Thread thread = Thread.currentThread();
 //                String name = thread.getName();
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
index d8f369b..eabd5b4 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
@@ -22,12 +22,11 @@ import java.util.concurrent.Future;
 import com.surftools.BeanstalkClient.BeanstalkException;
 import com.surftools.BeanstalkClient.Client;
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.beanstalk.processors.Command;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 
-public class BeanstalkProducer extends DefaultProducer implements AsyncProcessor {
+public class BeanstalkProducer extends DefaultAsyncProducer {
     private ExecutorService executor;
     private Client client;
     private final Command command;
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java
index dc4fdff..32fcbec 100644
--- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java
@@ -126,9 +126,9 @@ public class DisruptorConcurrentTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("disruptor:foo?concurrentConsumers=10").to("mock:before").delay(2000).to("mock:result");
+                from("disruptor:foo?concurrentConsumers=10").to("mock:before").delay(2000).syncDelayed().to("mock:result");
 
-                from("disruptor:bar?concurrentConsumers=10").to("mock:before").delay(2000)
+                from("disruptor:bar?concurrentConsumers=10").to("mock:before").delay(2000).syncDelayed()
                         .transform(body().prepend("Bye ")).to("mock:result");
             }
         };
diff --git a/components/camel-docker/src/main/java/org/apache/camel/component/docker/producer/AsyncDockerProducer.java b/components/camel-docker/src/main/java/org/apache/camel/component/docker/producer/AsyncDockerProducer.java
index 4fc1f16..186f6e4 100644
--- a/components/camel-docker/src/main/java/org/apache/camel/component/docker/producer/AsyncDockerProducer.java
+++ b/components/camel-docker/src/main/java/org/apache/camel/component/docker/producer/AsyncDockerProducer.java
@@ -215,15 +215,12 @@ public class AsyncDockerProducer extends DefaultAsyncProducer {
             // If request included a response, set as body
             if (result != null) {
                 exchange.getIn().setBody(result);
-
-                return true;
             }
         } catch (DockerException | InterruptedException | IOException e) {
             log.error(e.getMessage(), e);
-
-            return false;
         }
 
+        callback.done(false);
         return false;
     }
 
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
index 97cb857..626134d 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
@@ -21,9 +21,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.AsyncProcessorSupport;
 import org.junit.Test;
 
 /**
@@ -70,7 +70,7 @@ public class FromFtpAsyncProcessTest extends FtpServerTestSupport {
         };
     }
 
-    private class MyAsyncProcessor implements AsyncProcessor {
+    private class MyAsyncProcessor extends AsyncProcessorSupport {
 
         private ExecutorService executor = Executors.newSingleThreadExecutor();
 
@@ -93,9 +93,5 @@ public class FromFtpAsyncProcessTest extends FtpServerTestSupport {
             return false;
         }
 
-        @Override
-        public void process(Exchange exchange) throws Exception {
-            // noop
-        }
     }
 }
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpShutdownCompleteCurrentTaskOnlyTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpShutdownCompleteCurrentTaskOnlyTest.java
index 9509cc7..ea1cdb5 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpShutdownCompleteCurrentTaskOnlyTest.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpShutdownCompleteCurrentTaskOnlyTest.java
@@ -75,7 +75,7 @@ public class FtpShutdownCompleteCurrentTaskOnlyTest extends FtpServerTestSupport
                 from(getFtpUrl()).routeId("route1")
                     // let it complete only current task so we shutdown faster
                     .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
-                    .delay(1000).to("seda:foo");
+                    .delay(1000).syncDelayed().to("seda:foo");
 
                 from("seda:foo").routeId("route2").to("mock:bar");
             }
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
index 188cce2..da3b9dd 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
@@ -28,7 +28,6 @@ import io.grpc.stub.StreamObserver;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
 import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
@@ -36,15 +35,15 @@ import org.apache.camel.component.grpc.client.GrpcExchangeForwarder;
 import org.apache.camel.component.grpc.client.GrpcExchangeForwarderFactory;
 import org.apache.camel.component.grpc.client.GrpcResponseAggregationStreamObserver;
 import org.apache.camel.component.grpc.client.GrpcResponseRouterStreamObserver;
-import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.spi.ClassResolver;
-import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * Represents asynchronous and synchronous gRPC producer implementations.
  */
-public class GrpcProducer extends DefaultProducer implements AsyncProcessor {
+public class GrpcProducer extends DefaultAsyncProducer {
 
     protected final GrpcConfiguration configuration;
     protected final GrpcEndpoint endpoint;
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
index 140c929..94a251f 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
@@ -89,7 +89,8 @@ public class IgniteCacheProducer extends DefaultAsyncProducer {
             break;
         }
 
-        return true;
+        callback.done(false);
+        return false;
     }
 
     @SuppressWarnings("unchecked")
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
index 7300416..78b29dd 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
@@ -85,17 +85,18 @@ public class IgniteComputeProducer extends DefaultAsyncProducer {
                 
             default:
                 exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Compute producer."));
-                return true;
+                callback.done(false);
+                return false;
             }
 
             compute.future().listen(IgniteInCamelClosure.create(exchange, callback));
+            return false;
 
         } catch (Exception e) {
             exchange.setException(e);
-            return true;
+            callback.done(false);
+            return false;
         }
-
-        return false;
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java
index d12a64f..6a99de7 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java
@@ -70,10 +70,11 @@ public class IgniteIdGenProducer extends DefaultAsyncProducer {
             
         default:
             exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite ID Generator producer."));
-            return true;
+            break;
         }
 
-        return true;
+        callback.done(false);
+        return false;
     }
 
     private IgniteIdGenOperation idGenOperationFor(Exchange exchange) {
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
index 1a0c017..c0b9947 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
@@ -61,8 +61,8 @@ public class IgniteMessagingProducer extends DefaultAsyncProducer {
         }
 
         IgniteHelper.maybePropagateIncomingBody(endpoint, in, out);
-
-        return true;
+        callback.done(false);
+        return false;
     }
 
     private String topicFor(Exchange exchange) {
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java
index a2d929c..1104bd8 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java
@@ -157,10 +157,11 @@ public class IgniteQueueProducer extends DefaultAsyncProducer {
             
         default:
             exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Queue producer."));
-            return true;
+            break;
         }
 
-        return true;
+        callback.done(false);
+        return false;
     }
 
     private IgniteQueueOperation queueOperationFor(Exchange exchange) {
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java
index c21e8d8..00f6d36 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java
@@ -107,10 +107,11 @@ public class IgniteSetProducer extends DefaultAsyncProducer {
             
         default:
             exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Set producer."));
-            return true;
+            break;
         }
 
-        return true;
+        callback.done(false);
+        return false;
     }
 
     private IgniteSetOperation setOperationFor(Exchange exchange) {
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
index 24ce1f8..74d710e 100644
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
@@ -62,7 +62,7 @@ public class NettyRedeliveryTest extends CamelTestSupport {
     @EndpointInject(uri = "mock:downstream")
     private MockEndpoint downstream;
 
-    private Deque<Callable<?>> tasks = new LinkedBlockingDeque<>();
+    private Deque<Object> tasks = new LinkedBlockingDeque<>();
     private int port;
     private boolean alive = true;
 
@@ -83,6 +83,7 @@ public class NettyRedeliveryTest extends CamelTestSupport {
                         .retriesExhaustedLogLevel(LoggingLevel.ERROR)
                         // lets have a little delay so we do async redelivery
                         .redeliveryDelay(10)
+                        .asyncDelayedRedelivery()
                         .to("mock:exception")
                         .handled(true);
 
@@ -149,7 +150,7 @@ public class NettyRedeliveryTest extends CamelTestSupport {
             @Override
             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                 if ("submit".equals(method.getName()) || "schedule".equals(method.getName())) {
-                    tasks.add((Callable<?>) args[0]);
+                    tasks.add(args[0]);
                 }
                 return method.invoke(delegate, args);
             }