You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/11/22 16:33:25 UTC
[camel] 08/11: Fix async engine / tests
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6094865d0d60b0dd0f14db6e6a9e24b7a6684cf7
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Wed Nov 21 11:40:35 2018 +0100
Fix async engine / tests
---
.../apache/camel/impl/AbstractCamelContext.java | 87 ++++++++++++----------
.../org/apache/camel/processor/LoopProcessor.java | 6 +-
.../apache/camel/processor/MulticastProcessor.java | 6 +-
.../java/org/apache/camel/processor/Pipeline.java | 9 ++-
.../camel/processor/RedeliveryErrorHandler.java | 6 +-
.../apache/camel/processor/WireTapProcessor.java | 21 +++---
.../org/apache/camel/support/ReactiveHelper.java | 22 ++++--
.../component/beanstalk/BeanstalkProducer.java | 5 +-
.../disruptor/DisruptorConcurrentTest.java | 4 +-
.../docker/producer/AsyncDockerProducer.java | 5 +-
.../file/remote/FromFtpAsyncProcessTest.java | 8 +-
.../FtpShutdownCompleteCurrentTaskOnlyTest.java | 2 +-
.../apache/camel/component/grpc/GrpcProducer.java | 7 +-
.../ignite/cache/IgniteCacheProducer.java | 3 +-
.../ignite/compute/IgniteComputeProducer.java | 9 ++-
.../ignite/idgen/IgniteIdGenProducer.java | 5 +-
.../ignite/messaging/IgniteMessagingProducer.java | 4 +-
.../ignite/queue/IgniteQueueProducer.java | 5 +-
.../component/ignite/set/IgniteSetProducer.java | 5 +-
.../component/netty4/NettyRedeliveryTest.java | 5 +-
20 files changed, 126 insertions(+), 98 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java
index 00aa494..146edf1 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java
@@ -234,8 +234,6 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
private Boolean allowUseOriginalMessage = Boolean.FALSE;
private Long delay;
private ErrorHandlerFactory errorHandlerBuilder;
- private final Object errorHandlerExecutorServiceLock = new Object();
- private ScheduledExecutorService errorHandlerExecutorService;
private Map<String, DataFormatDefinition> dataFormats = new HashMap<>();
private Map<String, String> globalOptions = new HashMap<>();
private PropertiesComponent propertiesComponent;
@@ -243,10 +241,10 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
private final Map<String, RouteService> routeServices = new LinkedHashMap<>();
private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<>();
+ private final Object lock = new Object();
private volatile CamelContextNameStrategy nameStrategy;
private volatile ManagementNameStrategy managementNameStrategy;
- private Registry registry;
-
+ private volatile Registry registry;
private volatile TypeConverter typeConverter;
private volatile TypeConverterRegistry typeConverterRegistry;
private volatile Injector injector;
@@ -276,6 +274,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
private volatile UnitOfWorkFactory unitOfWorkFactory;
private volatile ReloadStrategy reloadStrategy;
private volatile RouteController routeController;
+ private volatile ScheduledExecutorService errorHandlerExecutorService;
private TransformerRegistry<TransformerKey> transformerRegistry;
private ValidatorRegistry<ValidatorKey> validatorRegistry;
@@ -413,7 +412,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public CamelContextNameStrategy getNameStrategy() {
if (nameStrategy == null) {
- synchronized (this) {
+ synchronized (lock) {
if (nameStrategy == null) {
setNameStrategy(createCamelContextNameStrategy());
}
@@ -428,7 +427,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ManagementNameStrategy getManagementNameStrategy() {
if (managementNameStrategy == null) {
- synchronized (this) {
+ synchronized (lock) {
if (managementNameStrategy == null) {
setManagementNameStrategy(createManagementNameStrategy());
}
@@ -913,7 +912,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
@Override
public RouteController getRouteController() {
if (routeController == null) {
- synchronized (this) {
+ synchronized (lock) {
if (routeController == null) {
setRouteController(createRouteController());
}
@@ -2456,7 +2455,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public TypeConverter getTypeConverter() {
if (typeConverter == null) {
- synchronized (this) {
+ synchronized (lock) {
if (typeConverter == null) {
setTypeConverter(createTypeConverter());
}
@@ -2471,7 +2470,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public TypeConverterRegistry getTypeConverterRegistry() {
if (typeConverterRegistry == null) {
- synchronized (this) {
+ synchronized (lock) {
if (typeConverterRegistry == null) {
setTypeConverterRegistry(createTypeConverterRegistry());
}
@@ -2486,7 +2485,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public Injector getInjector() {
if (injector == null) {
- synchronized (this) {
+ synchronized (lock) {
if (injector == null) {
setInjector(createInjector());
}
@@ -2509,7 +2508,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ComponentResolver getComponentResolver() {
if (componentResolver == null) {
- synchronized (this) {
+ synchronized (lock) {
if (componentResolver == null) {
setComponentResolver(createComponentResolver());
}
@@ -2524,7 +2523,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public LanguageResolver getLanguageResolver() {
if (languageResolver == null) {
- synchronized (this) {
+ synchronized (lock) {
if (languageResolver == null) {
setLanguageResolver(createLanguageResolver());
}
@@ -2547,7 +2546,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public Registry getRegistry() {
if (registry == null) {
- synchronized (this) {
+ synchronized (lock) {
if (registry == null) {
setRegistry(createRegistry());
}
@@ -2875,22 +2874,32 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
}
public ScheduledExecutorService getErrorHandlerExecutorService() {
- synchronized (errorHandlerExecutorServiceLock) {
- if (errorHandlerExecutorService == null) {
- // setup default thread pool for error handler
- errorHandlerExecutorService = getExecutorServiceManager().newDefaultScheduledThreadPool("ErrorHandlerRedeliveryThreadPool", "ErrorHandlerRedeliveryTask");
+ if (errorHandlerExecutorService == null) {
+ synchronized (lock) {
+ if (errorHandlerExecutorService == null) {
+ // setup default thread pool for error handler
+ errorHandlerExecutorService = createErrorHandlerExecutorService();
+ }
}
}
return errorHandlerExecutorService;
}
+ protected ScheduledExecutorService createErrorHandlerExecutorService() {
+ return getExecutorServiceManager().newDefaultScheduledThreadPool("ErrorHandlerRedeliveryThreadPool", "ErrorHandlerRedeliveryTask");
+ }
+
+ public void setErrorHandlerExecutorService(ScheduledExecutorService errorHandlerExecutorService) {
+ this.errorHandlerExecutorService = errorHandlerExecutorService;
+ }
+
public void setProducerServicePool(ServicePool<Producer> producerServicePool) {
this.producerServicePool = doAddService(producerServicePool);
}
public ServicePool<Producer> getProducerServicePool() {
if (producerServicePool == null) {
- synchronized (this) {
+ synchronized (lock) {
if (producerServicePool == null) {
setProducerServicePool(createProducerServicePool());
}
@@ -2901,7 +2910,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ServicePool<PollingConsumer> getPollingConsumerServicePool() {
if (pollingConsumerServicePool == null) {
- synchronized (this) {
+ synchronized (lock) {
if (pollingConsumerServicePool == null) {
setPollingConsumerServicePool(createPollingConsumerServicePool());
}
@@ -2916,7 +2925,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public UnitOfWorkFactory getUnitOfWorkFactory() {
if (unitOfWorkFactory == null) {
- synchronized (this) {
+ synchronized (lock) {
if (unitOfWorkFactory == null) {
setUnitOfWorkFactory(createUnitOfWorkFactory());
}
@@ -2954,7 +2963,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public String getVersion() {
if (version == null) {
- synchronized (this) {
+ synchronized (lock) {
if (version == null) {
version = doGetVersion();
}
@@ -4121,7 +4130,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public FactoryFinder getDefaultFactoryFinder() {
if (defaultFactoryFinder == null) {
- synchronized (this) {
+ synchronized (lock) {
if (defaultFactoryFinder == null) {
defaultFactoryFinder = getFactoryFinderResolver().resolveDefaultFactoryFinder(getClassResolver());
}
@@ -4132,7 +4141,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public FactoryFinderResolver getFactoryFinderResolver() {
if (factoryFinderResolver == null) {
- synchronized (this) {
+ synchronized (lock) {
if (factoryFinderResolver == null) {
factoryFinderResolver = createFactoryFinderResolver();
}
@@ -4155,7 +4164,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ClassResolver getClassResolver() {
if (classResolver == null) {
- synchronized (this) {
+ synchronized (lock) {
if (classResolver == null) {
setClassResolver(createClassResolver());
}
@@ -4170,7 +4179,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public PackageScanClassResolver getPackageScanClassResolver() {
if (packageScanClassResolver == null) {
- synchronized (this) {
+ synchronized (lock) {
if (packageScanClassResolver == null) {
setPackageScanClassResolver(createPackageScanClassResolver());
}
@@ -4193,7 +4202,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ModelJAXBContextFactory getModelJAXBContextFactory() {
if (modelJAXBContextFactory == null) {
- synchronized (this) {
+ synchronized (lock) {
if (modelJAXBContextFactory == null) {
setModelJAXBContextFactory(createModelJAXBContextFactory());
}
@@ -4208,7 +4217,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public NodeIdFactory getNodeIdFactory() {
if (nodeIdFactory == null) {
- synchronized (this) {
+ synchronized (lock) {
if (nodeIdFactory == null) {
setNodeIdFactory(createNodeIdFactory());
}
@@ -4223,7 +4232,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ManagementStrategy getManagementStrategy() {
if (managementStrategy == null) {
- synchronized (this) {
+ synchronized (lock) {
if (managementStrategy == null) {
setManagementStrategy(createManagementStrategy());
}
@@ -4254,7 +4263,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public InflightRepository getInflightRepository() {
if (inflightRepository == null) {
- synchronized (this) {
+ synchronized (lock) {
if (inflightRepository == null) {
setInflightRepository(createInflightRepository());
}
@@ -4269,7 +4278,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
if (asyncProcessorAwaitManager == null) {
- synchronized (this) {
+ synchronized (lock) {
if (asyncProcessorAwaitManager == null) {
setAsyncProcessorAwaitManager(createAsyncProcessorAwaitManager());
}
@@ -4341,7 +4350,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public DataFormatResolver getDataFormatResolver() {
if (dataFormatResolver == null) {
- synchronized (this) {
+ synchronized (lock) {
if (dataFormatResolver == null) {
setDataFormatResolver(createDataFormatResolver());
}
@@ -4396,7 +4405,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ShutdownStrategy getShutdownStrategy() {
if (shutdownStrategy == null) {
- synchronized (this) {
+ synchronized (lock) {
if (shutdownStrategy == null) {
setShutdownStrategy(createShutdownStrategy());
}
@@ -4435,7 +4444,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ExecutorServiceManager getExecutorServiceManager() {
if (executorServiceManager == null) {
- synchronized (this) {
+ synchronized (lock) {
if (executorServiceManager == null) {
setExecutorServiceManager(createExecutorServiceManager());
}
@@ -4451,7 +4460,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public ProcessorFactory getProcessorFactory() {
if (processorFactory == null) {
- synchronized (this) {
+ synchronized (lock) {
if (processorFactory == null) {
setProcessorFactory(createProcessorFactory());
}
@@ -4466,7 +4475,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public MessageHistoryFactory getMessageHistoryFactory() {
if (messageHistoryFactory == null) {
- synchronized (this) {
+ synchronized (lock) {
if (messageHistoryFactory == null) {
setMessageHistoryFactory(createMessageHistoryFactory());
}
@@ -4492,7 +4501,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public UuidGenerator getUuidGenerator() {
if (uuidGenerator == null) {
- synchronized (this) {
+ synchronized (lock) {
if (uuidGenerator == null) {
setUuidGenerator(createUuidGenerator());
}
@@ -4507,7 +4516,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public StreamCachingStrategy getStreamCachingStrategy() {
if (streamCachingStrategy == null) {
- synchronized (this) {
+ synchronized (lock) {
if (streamCachingStrategy == null) {
setStreamCachingStrategy(createStreamCachingStrategy());
}
@@ -4522,7 +4531,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
public RestRegistry getRestRegistry() {
if (restRegistry == null) {
- synchronized (this) {
+ synchronized (lock) {
if (restRegistry == null) {
setRestRegistry(createRestRegistry());
}
@@ -4616,7 +4625,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod
@Override
public HeadersMapFactory getHeadersMapFactory() {
if (headersMapFactory == null) {
- synchronized (this) {
+ synchronized (lock) {
if (headersMapFactory == null) {
setHeadersMapFactory(createHeadersMapFactory());
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 92614a0..b6aecbc 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -52,7 +52,11 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
LoopState state = new LoopState(exchange, callback);
- ReactiveHelper.scheduleMain(state);
+ if (exchange.isTransacted()) {
+ ReactiveHelper.scheduleSync(state);
+ } else {
+ ReactiveHelper.scheduleMain(state);
+ }
return false;
} catch (Exception e) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index df08df0..ccb6e23 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -223,7 +223,11 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
if (isParallelProcessing()) {
executorService.submit(() -> ReactiveHelper.schedule(state));
} else {
- ReactiveHelper.scheduleMain(state);
+ if (exchange.isTransacted()) {
+ ReactiveHelper.scheduleSync(state);
+ } else {
+ ReactiveHelper.scheduleMain(state);
+ }
}
// the remainder of the multicast will be completed async
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
index 0d7d4e5..90bee9c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -81,8 +81,13 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
- "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+ if (exchange.isTransacted()) {
+ ReactiveHelper.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+ "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+ } else {
+ ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+ "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+ }
return false;
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 1a87121..1577ad1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -152,7 +152,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
// Create the redelivery state object for this exchange
RedeliveryState state = new RedeliveryState(exchange, callback);
// Run it
- ReactiveHelper.scheduleMain(state);
+ if (exchange.isTransacted()) {
+ ReactiveHelper.scheduleSync(state);
+ } else {
+ ReactiveHelper.scheduleMain(state);
+ }
return false;
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 2282467..1c47414 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -38,6 +38,7 @@ import org.apache.camel.Traceable;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ExchangeHelper;
@@ -143,20 +144,16 @@ public class WireTapProcessor extends AsyncProcessorSupport implements Traceable
final Exchange wireTapExchange = target;
// send the exchange to the destination using an executor service
- executorService.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
+ executorService.submit(() -> {
taskCount.increment();
- try {
- log.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
- processor.process(wireTapExchange);
- } catch (Throwable e) {
- log.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e);
- } finally {
+ log.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
+ AsyncProcessorConverterHelper.convert(processor).process(wireTapExchange, doneSync -> {
+ if (wireTapExchange.getException() != null) {
+ log.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", wireTapExchange.getException());
+ }
taskCount.decrement();
- }
- return wireTapExchange;
- }
- });
+ });
+ });
// continue routing this synchronously
callback.done(true);
diff --git a/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java b/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
index f5a66e6..3676d60 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -33,23 +33,31 @@ public class ReactiveHelper {
}
public static void scheduleMain(Runnable runnable) {
- WORKERS.get().schedule(runnable, true, true);
+ WORKERS.get().schedule(runnable, true, true, false);
+ }
+
+ public static void scheduleSync(Runnable runnable) {
+ WORKERS.get().schedule(runnable, true, true, true);
}
public static void scheduleMain(Runnable runnable, String description) {
- WORKERS.get().schedule(describe(runnable, description), true, true);
+ WORKERS.get().schedule(describe(runnable, description), true, true, false);
}
public static void schedule(Runnable runnable) {
- WORKERS.get().schedule(runnable, true, false);
+ WORKERS.get().schedule(runnable, true, false, false);
}
public static void schedule(Runnable runnable, String description) {
- WORKERS.get().schedule(describe(runnable, description), true, false);
+ WORKERS.get().schedule(describe(runnable, description), true, false, false);
}
public static void scheduleLast(Runnable runnable, String description) {
- WORKERS.get().schedule(describe(runnable, description), false, false);
+ WORKERS.get().schedule(describe(runnable, description), false, false, false);
+ }
+
+ public static void scheduleSync(Runnable runnable, String description) {
+ WORKERS.get().schedule(describe(runnable, description), false, true, true);
}
public static boolean executeFromQueue() {
@@ -88,7 +96,7 @@ public class ReactiveHelper {
LinkedList<LinkedList<Runnable>> back;
boolean running;
- public void schedule(Runnable runnable, boolean first, boolean main) {
+ public void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
if (main) {
if (!queue.isEmpty()) {
if (back == null) {
@@ -103,7 +111,7 @@ public class ReactiveHelper {
} else {
queue.addLast(runnable);
}
- if (!running) {
+ if (!running || sync) {
running = true;
// Thread thread = Thread.currentThread();
// String name = thread.getName();
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
index d8f369b..eabd5b4 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
@@ -22,12 +22,11 @@ import java.util.concurrent.Future;
import com.surftools.BeanstalkClient.BeanstalkException;
import com.surftools.BeanstalkClient.Client;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.component.beanstalk.processors.Command;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
-public class BeanstalkProducer extends DefaultProducer implements AsyncProcessor {
+public class BeanstalkProducer extends DefaultAsyncProducer {
private ExecutorService executor;
private Client client;
private final Command command;
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java
index dc4fdff..32fcbec 100644
--- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorConcurrentTest.java
@@ -126,9 +126,9 @@ public class DisruptorConcurrentTest extends CamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("disruptor:foo?concurrentConsumers=10").to("mock:before").delay(2000).to("mock:result");
+ from("disruptor:foo?concurrentConsumers=10").to("mock:before").delay(2000).syncDelayed().to("mock:result");
- from("disruptor:bar?concurrentConsumers=10").to("mock:before").delay(2000)
+ from("disruptor:bar?concurrentConsumers=10").to("mock:before").delay(2000).syncDelayed()
.transform(body().prepend("Bye ")).to("mock:result");
}
};
diff --git a/components/camel-docker/src/main/java/org/apache/camel/component/docker/producer/AsyncDockerProducer.java b/components/camel-docker/src/main/java/org/apache/camel/component/docker/producer/AsyncDockerProducer.java
index 4fc1f16..186f6e4 100644
--- a/components/camel-docker/src/main/java/org/apache/camel/component/docker/producer/AsyncDockerProducer.java
+++ b/components/camel-docker/src/main/java/org/apache/camel/component/docker/producer/AsyncDockerProducer.java
@@ -215,15 +215,12 @@ public class AsyncDockerProducer extends DefaultAsyncProducer {
// If request included a response, set as body
if (result != null) {
exchange.getIn().setBody(result);
-
- return true;
}
} catch (DockerException | InterruptedException | IOException e) {
log.error(e.getMessage(), e);
-
- return false;
}
+ callback.done(false);
return false;
}
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
index 97cb857..626134d 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
@@ -21,9 +21,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.junit.Test;
/**
@@ -70,7 +70,7 @@ public class FromFtpAsyncProcessTest extends FtpServerTestSupport {
};
}
- private class MyAsyncProcessor implements AsyncProcessor {
+ private class MyAsyncProcessor extends AsyncProcessorSupport {
private ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -93,9 +93,5 @@ public class FromFtpAsyncProcessTest extends FtpServerTestSupport {
return false;
}
- @Override
- public void process(Exchange exchange) throws Exception {
- // noop
- }
}
}
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpShutdownCompleteCurrentTaskOnlyTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpShutdownCompleteCurrentTaskOnlyTest.java
index 9509cc7..ea1cdb5 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpShutdownCompleteCurrentTaskOnlyTest.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpShutdownCompleteCurrentTaskOnlyTest.java
@@ -75,7 +75,7 @@ public class FtpShutdownCompleteCurrentTaskOnlyTest extends FtpServerTestSupport
from(getFtpUrl()).routeId("route1")
// let it complete only current task so we shutdown faster
.shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
- .delay(1000).to("seda:foo");
+ .delay(1000).syncDelayed().to("seda:foo");
from("seda:foo").routeId("route2").to("mock:bar");
}
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
index 188cce2..da3b9dd 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
@@ -28,7 +28,6 @@ import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
@@ -36,15 +35,15 @@ import org.apache.camel.component.grpc.client.GrpcExchangeForwarder;
import org.apache.camel.component.grpc.client.GrpcExchangeForwarderFactory;
import org.apache.camel.component.grpc.client.GrpcResponseAggregationStreamObserver;
import org.apache.camel.component.grpc.client.GrpcResponseRouterStreamObserver;
-import org.apache.camel.support.DefaultProducer;
import org.apache.camel.spi.ClassResolver;
-import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
/**
* Represents asynchronous and synchronous gRPC producer implementations.
*/
-public class GrpcProducer extends DefaultProducer implements AsyncProcessor {
+public class GrpcProducer extends DefaultAsyncProducer {
protected final GrpcConfiguration configuration;
protected final GrpcEndpoint endpoint;
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
index 140c929..94a251f 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
@@ -89,7 +89,8 @@ public class IgniteCacheProducer extends DefaultAsyncProducer {
break;
}
- return true;
+ callback.done(false);
+ return false;
}
@SuppressWarnings("unchecked")
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
index 7300416..78b29dd 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
@@ -85,17 +85,18 @@ public class IgniteComputeProducer extends DefaultAsyncProducer {
default:
exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Compute producer."));
- return true;
+ callback.done(false);
+ return false;
}
compute.future().listen(IgniteInCamelClosure.create(exchange, callback));
+ return false;
} catch (Exception e) {
exchange.setException(e);
- return true;
+ callback.done(false);
+ return false;
}
-
- return false;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java
index d12a64f..6a99de7 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java
@@ -70,10 +70,11 @@ public class IgniteIdGenProducer extends DefaultAsyncProducer {
default:
exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite ID Generator producer."));
- return true;
+ break;
}
- return true;
+ callback.done(false);
+ return false;
}
private IgniteIdGenOperation idGenOperationFor(Exchange exchange) {
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
index 1a0c017..c0b9947 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
@@ -61,8 +61,8 @@ public class IgniteMessagingProducer extends DefaultAsyncProducer {
}
IgniteHelper.maybePropagateIncomingBody(endpoint, in, out);
-
- return true;
+ callback.done(false);
+ return false;
}
private String topicFor(Exchange exchange) {
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java
index a2d929c..1104bd8 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java
@@ -157,10 +157,11 @@ public class IgniteQueueProducer extends DefaultAsyncProducer {
default:
exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Queue producer."));
- return true;
+ break;
}
- return true;
+ callback.done(false);
+ return false;
}
private IgniteQueueOperation queueOperationFor(Exchange exchange) {
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java
index c21e8d8..00f6d36 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java
@@ -107,10 +107,11 @@ public class IgniteSetProducer extends DefaultAsyncProducer {
default:
exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Set producer."));
- return true;
+ break;
}
- return true;
+ callback.done(false);
+ return false;
}
private IgniteSetOperation setOperationFor(Exchange exchange) {
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
index 24ce1f8..74d710e 100644
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
@@ -62,7 +62,7 @@ public class NettyRedeliveryTest extends CamelTestSupport {
@EndpointInject(uri = "mock:downstream")
private MockEndpoint downstream;
- private Deque<Callable<?>> tasks = new LinkedBlockingDeque<>();
+ private Deque<Object> tasks = new LinkedBlockingDeque<>();
private int port;
private boolean alive = true;
@@ -83,6 +83,7 @@ public class NettyRedeliveryTest extends CamelTestSupport {
.retriesExhaustedLogLevel(LoggingLevel.ERROR)
// lets have a little delay so we do async redelivery
.redeliveryDelay(10)
+ .asyncDelayedRedelivery()
.to("mock:exception")
.handled(true);
@@ -149,7 +150,7 @@ public class NettyRedeliveryTest extends CamelTestSupport {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("submit".equals(method.getName()) || "schedule".equals(method.getName())) {
- tasks.add((Callable<?>) args[0]);
+ tasks.add(args[0]);
}
return method.invoke(delegate, args);
}