You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/11/22 16:33:21 UTC
[camel] 04/11: Introduce AsyncProcessorSupport to remove the number
of dumb sync process() implementations
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6376c6ee3046a710948346d2abc32c4e7df0f867
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Tue Nov 13 13:58:51 2018 +0100
Introduce AsyncProcessorSupport to remove the number of dumb sync process() implementations
---
.../component/bean/AbstractBeanProcessor.java | 24 ++++------
.../impl/InterceptSendToEndpointProcessor.java | 8 +---
.../camel/impl/SubscribeMethodProcessor.java | 10 +---
.../impl/cloud/DefaultServiceCallProcessor.java | 17 ++-----
.../apache/camel/processor/CamelLogProcessor.java | 54 +++++++++-------------
.../apache/camel/processor/ChoiceProcessor.java | 9 +---
.../camel/processor/ClaimCheckProcessor.java | 10 +---
.../camel/processor/DelegateAsyncProcessor.java | 11 +----
.../java/org/apache/camel/processor/Enricher.java | 9 +---
.../processor/EvaluateExpressionProcessor.java | 10 +---
.../camel/processor/ExchangePatternProcessor.java | 10 +---
.../org/apache/camel/processor/LogProcessor.java | 10 +---
.../apache/camel/processor/MarshalProcessor.java | 10 +---
.../apache/camel/processor/MulticastProcessor.java | 10 +---
.../camel/processor/OnCompletionProcessor.java | 10 +---
.../java/org/apache/camel/processor/Pipeline.java | 10 +---
.../org/apache/camel/processor/PollEnricher.java | 10 +---
.../org/apache/camel/processor/RecipientList.java | 10 +---
.../camel/processor/RedeliveryErrorHandler.java | 2 +-
.../camel/processor/RemoveHeaderProcessor.java | 10 +---
.../camel/processor/RemoveHeadersProcessor.java | 10 +---
.../camel/processor/RemovePropertiesProcessor.java | 10 +---
.../camel/processor/RemovePropertyProcessor.java | 10 +---
.../org/apache/camel/processor/Resequencer.java | 9 +---
.../apache/camel/processor/RollbackProcessor.java | 10 +---
.../org/apache/camel/processor/RoutingSlip.java | 16 ++-----
.../apache/camel/processor/SamplingThrottler.java | 19 +-------
.../apache/camel/processor/ScriptProcessor.java | 10 +---
.../camel/processor/SendDynamicProcessor.java | 10 +---
.../org/apache/camel/processor/SendProcessor.java | 10 +---
.../apache/camel/processor/SetBodyProcessor.java | 10 +---
.../apache/camel/processor/SetHeaderProcessor.java | 10 +---
.../camel/processor/SetPropertyProcessor.java | 10 +---
.../org/apache/camel/processor/SortProcessor.java | 10 +---
.../org/apache/camel/processor/StopProcessor.java | 10 +---
.../apache/camel/processor/StreamResequencer.java | 10 +---
.../apache/camel/processor/ThreadsProcessor.java | 10 +---
.../java/org/apache/camel/processor/Throttler.java | 11 +----
.../apache/camel/processor/ThroughputLogger.java | 3 +-
.../camel/processor/ThrowExceptionProcessor.java | 10 +---
.../apache/camel/processor/TransformProcessor.java | 10 +---
.../org/apache/camel/processor/TryProcessor.java | 9 +---
.../apache/camel/processor/UnitOfWorkProducer.java | 3 +-
.../apache/camel/processor/UnmarshalProcessor.java | 10 +---
.../apache/camel/processor/WireTapProcessor.java | 10 +---
.../processor/aggregate/AggregateProcessor.java | 14 ++----
.../processor/idempotent/IdempotentConsumer.java | 9 +---
.../loadbalancer/LoadBalancerSupport.java | 12 +----
.../processor/validation/ValidatingProcessor.java | 19 +++-----
...yncProducer.java => AsyncProcessorSupport.java} | 23 +++++----
.../apache/camel/support/DefaultAsyncProducer.java | 4 +-
.../org/apache/camel/processor/MDCAsyncTest.java | 8 +---
.../processor/async/AsyncEndpointPolicyTest.java | 29 +++---------
...syncEndpointRoutingSlipBeanNonBlockingTest.java | 8 +---
.../client/AbstractAtomixClientProducer.java | 9 ++--
.../cxf/CxfConsumerContinuationTimeoutTest.java | 8 +---
.../hystrix/processor/HystrixProcessor.java | 11 +----
.../streams/util/UnwrapStreamProcessor.java | 9 +---
.../camel/spring/spi/TransactionErrorHandler.java | 32 +++----------
59 files changed, 175 insertions(+), 524 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java b/camel-core/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
index ac9abf8..ab7298a 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
@@ -17,23 +17,19 @@
package org.apache.camel.component.bean;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoSuchBeanException;
import org.apache.camel.Processor;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ServiceHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A {@link Processor} which converts the inbound exchange to a method
* invocation on a POJO
*/
-public abstract class AbstractBeanProcessor implements AsyncProcessor {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractBeanProcessor.class);
+public abstract class AbstractBeanProcessor extends AsyncProcessorSupport {
private final BeanHolder beanHolder;
private transient Processor processor;
@@ -64,10 +60,6 @@ public abstract class AbstractBeanProcessor implements AsyncProcessor {
return "BeanProcessor[" + beanHolder + "]";
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
// do we have an explicit method name we always should invoke (either configured on endpoint or as a header)
String explicitMethodName = exchange.getIn().getHeader(Exchange.BEAN_METHOD_NAME, method, String.class);
@@ -111,7 +103,9 @@ public abstract class AbstractBeanProcessor implements AsyncProcessor {
}
}
if (target != null) {
- LOG.trace("Using a custom adapter as bean invocation: {}", target);
+ if (log.isTraceEnabled()) {
+ log.trace("Using a custom adapter as bean invocation: {}", target);
+ }
try {
target.process(exchange);
} catch (Throwable e) {
@@ -138,11 +132,13 @@ public abstract class AbstractBeanProcessor implements AsyncProcessor {
// and therefore the message body contains a BeanInvocation object.
// However this can causes problem if we in a Camel route invokes another bean,
// so we must test whether BeanHolder and BeanInvocation is the same bean or not
- LOG.trace("Exchange IN body is a BeanInvocation instance: {}", beanInvoke);
+ if (log.isTraceEnabled()) {
+ log.trace("Exchange IN body is a BeanInvocation instance: {}", beanInvoke);
+ }
Class<?> clazz = beanInvoke.getMethod().getDeclaringClass();
boolean sameBean = clazz.isInstance(bean);
- if (LOG.isDebugEnabled()) {
- LOG.debug("BeanHolder bean: {} and beanInvocation bean: {} is same instance: {}", bean.getClass(), clazz, sameBean);
+ if (log.isDebugEnabled()) {
+ log.debug("BeanHolder bean: {} and beanInvocation bean: {} is same instance: {}", bean.getClass(), clazz, sameBean);
}
if (sameBean) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
index c96944d..160f19d 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
@@ -23,9 +23,9 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.AsyncProducer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ServiceHelper;
@@ -66,15 +66,11 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
if (endpoint.getDetour() != null) {
// detour the exchange using synchronous processing
AsyncProcessor detour = AsyncProcessorConverterHelper.convert(endpoint.getDetour());
- AsyncProcessor ascb = new AsyncProcessor() {
+ AsyncProcessor ascb = new AsyncProcessorSupport() {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
return callback(exchange, callback, true);
}
- @Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
};
return new Pipeline(exchange.getContext(), Arrays.asList(detour, ascb)).process(exchange, callback);
}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
index 2af7fe9..5d0501f 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
@@ -32,17 +32,16 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.PredicateBuilder;
import org.apache.camel.component.bean.BeanInfo;
import org.apache.camel.component.bean.BeanProcessor;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.processor.CamelInternalProcessor;
-import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A {@link Processor} which is used for POJO @Consume where you can have multiple @Consume on the same endpoint/consumer
* and via predicate's can filter and call different methods.
*/
-public final class SubscribeMethodProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor> {
+public final class SubscribeMethodProcessor extends AsyncProcessorSupport implements Navigate<Processor> {
private final Endpoint endpoint;
private final Map<AsyncProcessor, Predicate> methods = new LinkedHashMap<>();
@@ -72,11 +71,6 @@ public final class SubscribeMethodProcessor extends ServiceSupport implements As
}
@Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
// evaluate which predicate matches and call the method
diff --git a/camel-core/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
index 190520c..5374b9a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/cloud/DefaultServiceCallProcessor.java
@@ -19,7 +19,6 @@ package org.apache.camel.impl.cloud;
import java.util.Map;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -29,16 +28,12 @@ import org.apache.camel.cloud.ServiceDefinition;
import org.apache.camel.cloud.ServiceLoadBalancer;
import org.apache.camel.language.simple.SimpleLanguage;
import org.apache.camel.processor.SendDynamicProcessor;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class DefaultServiceCallProcessor extends ServiceSupport implements AsyncProcessor {
- private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceCallProcessor.class);
+public class DefaultServiceCallProcessor extends AsyncProcessorSupport {
private final ExchangePattern exchangePattern;
private final String name;
@@ -148,12 +143,6 @@ public class DefaultServiceCallProcessor extends ServiceSupport implements Async
// Processor
// *************************************
-
- @Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
final Message message = exchange.getIn();
@@ -183,7 +172,7 @@ public class DefaultServiceCallProcessor extends ServiceSupport implements Async
final int port = service.getPort();
final Map<String, String> meta = service.getMetadata();
- LOGGER.debug("Service {} active at server: {}:{}", name, host, port);
+ log.debug("Service {} active at server: {}:{}", name, host, port);
// set selected server as header
message.setHeader(ServiceCallConstants.SERVICE_HOST, host);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java
index d2e1549..e5ecdde 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java
@@ -19,18 +19,15 @@ package org.apache.camel.processor;
import java.util.Set;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
+import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.MaskingFormatter;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.spi.CamelLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A {@link Processor} which just logs to a {@link CamelLogger} object which can be used
@@ -39,11 +36,10 @@ import org.slf4j.LoggerFactory;
* The name <tt>CamelLogger</tt> has been chosen to avoid any name clash with log kits
* which has a <tt>Logger</tt> class.
*/
-public class CamelLogProcessor implements AsyncProcessor, IdAware {
+public class CamelLogProcessor extends AsyncProcessorSupport implements IdAware {
- private static final Logger LOG = LoggerFactory.getLogger(CamelLogProcessor.class);
private String id;
- private CamelLogger log;
+ private CamelLogger logger;
private ExchangeFormatter formatter;
private MaskingFormatter maskingFormatter;
private Set<LogListener> listeners;
@@ -52,13 +48,13 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
this(new CamelLogger(CamelLogProcessor.class.getName()));
}
- public CamelLogProcessor(CamelLogger log) {
+ public CamelLogProcessor(CamelLogger logger) {
this.formatter = new ToStringExchangeFormatter();
- this.log = log;
+ this.logger = logger;
}
- public CamelLogProcessor(CamelLogger log, ExchangeFormatter formatter, MaskingFormatter maskingFormatter, Set<LogListener> listeners) {
- this(log);
+ public CamelLogProcessor(CamelLogger logger, ExchangeFormatter formatter, MaskingFormatter maskingFormatter, Set<LogListener> listeners) {
+ this(logger);
this.formatter = formatter;
this.maskingFormatter = maskingFormatter;
this.listeners = listeners;
@@ -66,7 +62,7 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
@Override
public String toString() {
- return "Logger[" + log + "]";
+ return "Logger[" + logger + "]";
}
public String getId() {
@@ -77,42 +73,38 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
this.id = id;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
- if (log.shouldLog()) {
+ if (logger.shouldLog()) {
String output = formatter.format(exchange);
if (maskingFormatter != null) {
output = maskingFormatter.format(output);
}
output = fireListeners(exchange, output);
- log.log(output);
+ logger.log(output);
}
callback.done(true);
return true;
}
public void process(Exchange exchange, Throwable exception) {
- if (log.shouldLog()) {
+ if (logger.shouldLog()) {
String output = formatter.format(exchange);
if (maskingFormatter != null) {
output = maskingFormatter.format(output);
}
output = fireListeners(exchange, output);
- log.log(output, exception);
+ logger.log(output, exception);
}
}
public void process(Exchange exchange, String message) {
- if (log.shouldLog()) {
+ if (logger.shouldLog()) {
String output = formatter.format(exchange) + message;
if (maskingFormatter != null) {
output = maskingFormatter.format(output);
}
output = fireListeners(exchange, output);
- log.log(output);
+ logger.log(output);
}
}
@@ -125,12 +117,12 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
continue;
}
try {
- String output = listener.onLog(exchange, log, message);
+ String output = listener.onLog(exchange, logger, message);
message = output != null ? output : message;
} catch (Throwable t) {
- LOG.warn("Ignoring an exception thrown by {}: {}", listener.getClass().getName(), t.getMessage());
- if (LOG.isDebugEnabled()) {
- LOG.debug("", t);
+ log.warn("Ignoring an exception thrown by {}: {}", listener.getClass().getName(), t.getMessage());
+ if (log.isDebugEnabled()) {
+ log.debug("", t);
}
}
}
@@ -138,19 +130,19 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
}
public CamelLogger getLogger() {
- return log;
+ return logger;
}
public void setLogName(String logName) {
- log.setLogName(logName);
+ logger.setLogName(logName);
}
public void setLevel(LoggingLevel level) {
- log.setLevel(level);
+ logger.setLevel(level);
}
public void setMarker(String marker) {
- log.setMarker(marker);
+ logger.setMarker(marker);
}
public void setMaskingFormatter(MaskingFormatter maskingFormatter) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
index aa807d8..c751363 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
@@ -28,9 +28,8 @@ import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -39,7 +38,7 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
* they are true their processors are used, with a default otherwise clause used
* if none match.
*/
-public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
+public class ChoiceProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
private String id;
private final List<FilterProcessor> filters;
@@ -51,10 +50,6 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N
this.otherwise = otherwise;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(final Exchange exchange, final AsyncCallback callback) {
Iterator<Processor> processors = next().iterator();
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
index b002659..e813740 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
@@ -17,7 +17,6 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
@@ -25,10 +24,9 @@ import org.apache.camel.impl.DefaultClaimCheckRepository;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.spi.ClaimCheckRepository;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
@@ -39,7 +37,7 @@ import org.apache.camel.util.ObjectHelper;
* This guards against concurrent and thread-safe issues. For off-memory persistent storage of data, then use
* any of the many Camel components that support persistent storage, and do not use this Claim Check EIP implementation.
*/
-public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
+public class ClaimCheckProcessor extends AsyncProcessorSupport implements IdAware, CamelContextAware {
private CamelContext camelContext;
private String id;
@@ -100,10 +98,6 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso
this.filter = filter;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// the repository is scoped per exchange
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
index 8f3d0f4..208bb58 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
@@ -25,10 +25,9 @@ import org.apache.camel.DelegateProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
-import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
/**
* A Delegate pattern which delegates processing to a nested {@link AsyncProcessor} which can
@@ -39,7 +38,7 @@ import org.apache.camel.support.ServiceSupport;
* @see DelegateSyncProcessor
* @see org.apache.camel.processor.DelegateProcessor
*/
-public class DelegateAsyncProcessor extends ServiceSupport implements DelegateProcessor, AsyncProcessor, Navigate<Processor> {
+public class DelegateAsyncProcessor extends AsyncProcessorSupport implements DelegateProcessor, Navigate<Processor> {
protected AsyncProcessor processor;
public DelegateAsyncProcessor() {
@@ -85,12 +84,6 @@ public class DelegateAsyncProcessor extends ServiceSupport implements DelegatePr
ServiceHelper.stopAndShutdownServices(processor);
}
- @Override
- public void process(Exchange exchange) throws Exception {
- final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
- awaitManager.process(this, exchange);
- }
-
public boolean process(final Exchange exchange, final AsyncCallback callback) {
return processor.process(exchange, callback);
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index fcb5293..703479f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -32,12 +32,11 @@ import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.StopWatch;
import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern;
@@ -54,7 +53,7 @@ import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern
*
* @see PollEnricher
*/
-public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
+public class Enricher extends AsyncProcessorSupport implements IdAware, CamelContextAware {
private CamelContext camelContext;
private String id;
@@ -134,10 +133,6 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware,
this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
diff --git a/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
index 80a5ee7..1b6f0a5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
@@ -17,11 +17,10 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A {@link org.apache.camel.Processor} which evaluates an {@link Expression}
@@ -31,7 +30,7 @@ import org.apache.camel.support.AsyncProcessorHelper;
* This processor will in case of evaluation exceptions, set the caused exception
* on the {@link Exchange}.
*/
-public class EvaluateExpressionProcessor implements AsyncProcessor, Traceable {
+public class EvaluateExpressionProcessor extends AsyncProcessorSupport implements Traceable {
private final Expression expression;
@@ -40,11 +39,6 @@ public class EvaluateExpressionProcessor implements AsyncProcessor, Traceable {
}
@Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
Object result = expression.evaluate(exchange, Object.class);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
index b8bfd76..8b7bf30 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
@@ -17,17 +17,15 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* Processor to set {@link org.apache.camel.ExchangePattern} on the {@link org.apache.camel.Exchange}.
*/
-public class ExchangePatternProcessor extends ServiceSupport implements AsyncProcessor, IdAware {
+public class ExchangePatternProcessor extends AsyncProcessorSupport implements IdAware {
private String id;
private ExchangePattern exchangePattern = ExchangePattern.InOnly;
@@ -54,10 +52,6 @@ public class ExchangePatternProcessor extends ServiceSupport implements AsyncPro
return exchangePattern;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
exchange.setPattern(exchangePattern);
callback.done(true);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
index fc0fa99..d2972ea 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
@@ -19,21 +19,19 @@ package org.apache.camel.processor;
import java.util.Set;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.MaskingFormatter;
-import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.spi.CamelLogger;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which evaluates an {@link Expression} and logs it.
*/
-public class LogProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class LogProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final Expression expression;
@@ -48,10 +46,6 @@ public class LogProcessor extends ServiceSupport implements AsyncProcessor, Trac
this.listeners = listeners;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
index 86e8f99..6565acf 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
@@ -17,7 +17,6 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
@@ -26,16 +25,15 @@ import org.apache.camel.Traceable;
import org.apache.camel.converter.stream.OutputStreamBuilder;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
* Marshals the body of the incoming message using the given
* <a href="http://camel.apache.org/data-format.html">data format</a>
*/
-public class MarshalProcessor extends ServiceSupport implements AsyncProcessor, Traceable, CamelContextAware, IdAware {
+public class MarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware {
private String id;
private CamelContext camelContext;
private final DataFormat dataFormat;
@@ -44,10 +42,6 @@ public class MarshalProcessor extends ServiceSupport implements AsyncProcessor,
this.dataFormat = dataFormat;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
ObjectHelper.notNull(dataFormat, "dataFormat");
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 5e6c991..df08df0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -54,14 +54,12 @@ import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.FilterIterator;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.StopWatch;
@@ -75,7 +73,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* endpoints, each endpoint receiving a copy of the message exchange.
* @see Pipeline
*/
-public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
+public class MulticastProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
/**
* Class that represent each step in the multicast route to do
@@ -209,10 +207,6 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
return camelContext;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
Iterable<ProcessorExchangePair> pairs;
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index a130c8f..7c478f8 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -20,7 +20,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -31,10 +30,9 @@ import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.support.SynchronizationAdapter;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -42,7 +40,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
/**
* Processor implementing <a href="http://camel.apache.org/oncompletion.html">onCompletion</a>.
*/
-public class OnCompletionProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class OnCompletionProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private final CamelContext camelContext;
private String id;
@@ -100,10 +98,6 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
this.id = id;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
if (processor != null) {
// register callback
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
index e544e0f..0d7d4e5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -31,11 +31,10 @@ import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -43,7 +42,7 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
* Creates a Pipeline pattern where the output of the previous step is sent as
* input to the next step, reusing the same message exchanges
*/
-public class Pipeline extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
+public class Pipeline extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
private final CamelContext camelContext;
private List<AsyncProcessor> processors;
@@ -81,11 +80,6 @@ public class Pipeline extends ServiceSupport implements AsyncProcessor, Navigate
}
@Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
"Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 003ba25..fea120c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -17,7 +17,6 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
@@ -32,13 +31,12 @@ import org.apache.camel.spi.ConsumerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.EventDrivenPollingConsumer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern;
@@ -54,7 +52,7 @@ import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern
*
* @see Enricher
*/
-public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
+public class PollEnricher extends AsyncProcessorSupport implements IdAware, CamelContextAware {
private CamelContext camelContext;
private ConsumerCache consumerCache;
@@ -161,10 +159,6 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 75314f7..82bc62d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -20,7 +20,6 @@ import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -32,11 +31,10 @@ import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ObjectHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.StringHelper;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -47,7 +45,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* pattern where the list of actual endpoints to send a message exchange to are
* dependent on some dynamic expression.
*/
-public class RecipientList extends ServiceSupport implements AsyncProcessor, IdAware {
+public class RecipientList extends AsyncProcessorSupport implements IdAware {
private static final String IGNORE_DELIMITER_MARKER = "false";
private final CamelContext camelContext;
@@ -109,10 +107,6 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA
this.id = id;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
if (!isStarted()) {
throw new IllegalStateException("RecipientList has not been started: " + this);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 7851ca0..43bf18c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -135,7 +135,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
}
}
- public void process(Exchange exchange) throws Exception {
+ public void process(Exchange exchange) {
if (output == null) {
// no output then just return
return;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java
index 8efad13..49752f9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java
@@ -17,18 +17,16 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which removes the header from the IN or OUT message
*/
-public class RemoveHeaderProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class RemoveHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private final String headerName;
private String id;
@@ -36,10 +34,6 @@ public class RemoveHeaderProcessor extends ServiceSupport implements AsyncProces
this.headerName = headerName;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java
index 8bfe5f8..04a48d9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java
@@ -17,18 +17,16 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which removes one ore more headers from the IN or OUT message
*/
-public class RemoveHeadersProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class RemoveHeadersProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final String pattern;
private final String[] excludePattern;
@@ -38,10 +36,6 @@ public class RemoveHeadersProcessor extends ServiceSupport implements AsyncProce
this.excludePattern = excludePattern;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java
index 275068b..1556fe7 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java
@@ -17,17 +17,15 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which removes one ore more properties from the exchange
*/
-public class RemovePropertiesProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class RemovePropertiesProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final String pattern;
private final String[] excludePattern;
@@ -37,10 +35,6 @@ public class RemovePropertiesProcessor extends ServiceSupport implements AsyncPr
this.excludePattern = excludePattern;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java
index 2958e5c..2508e2a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java
@@ -17,17 +17,15 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which removes the property from the exchange
*/
-public class RemovePropertyProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class RemovePropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final String propertyName;
@@ -35,10 +33,6 @@ public class RemovePropertyProcessor extends ServiceSupport implements AsyncProc
this.propertyName = propertyName;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java b/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
index a779716..c4c59d6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -45,18 +45,17 @@ import org.apache.camel.Traceable;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExpressionComparator;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
* An implementation of the <a href="http://camel.apache.org/resequencer.html">Resequencer</a>
* which can reorder messages within a batch.
*/
-public class Resequencer extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, IdAware, Traceable {
+public class Resequencer extends AsyncProcessorSupport implements Navigate<Processor>, IdAware, Traceable {
public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
public static final int DEFAULT_BATCH_SIZE = 100;
@@ -315,10 +314,6 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
collection.clear();
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
/**
* Enqueues an exchange for later batch processing.
*/
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java
index a684f3f..e65a141 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java
@@ -17,18 +17,16 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.RollbackExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* Processor for marking an {@link org.apache.camel.Exchange} to rollback.
*/
-public class RollbackProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class RollbackProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private boolean markRollbackOnly;
@@ -42,10 +40,6 @@ public class RollbackProcessor extends ServiceSupport implements AsyncProcessor,
this.message = message;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
if (isMarkRollbackOnlyLast()) {
// only mark the last route (current) as rollback
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index a928f49..8915457 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -20,7 +20,6 @@ import java.util.Iterator;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
-import org.apache.camel.AsyncProducer;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -35,12 +34,12 @@ import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.ObjectHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -54,7 +53,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
* pipeline to ensure it works the same and the async routing engine is flawless.
*/
-public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdAware {
protected String id;
protected ProducerCache producerCache;
@@ -158,10 +157,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
return "routingSlip[" + expression + "]";
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
if (!isStarted()) {
exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this));
@@ -487,12 +482,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
* Embedded processor that routes to the routing slip that has been set via the
* exchange property {@link Exchange#SLIP_PRODUCER}.
*/
- private final class RoutingSlipProcessor implements AsyncProcessor {
-
- @Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
+ private final class RoutingSlipProcessor extends AsyncProcessorSupport {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
index 42913a9..99f34a3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
@@ -20,12 +20,10 @@ import java.util.Locale;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A <code>SamplingThrottler</code> is a special kind of throttler. It also
@@ -38,7 +36,7 @@ import org.apache.camel.support.ServiceSupport;
* an exchange stream, rough consolidation of noisy and bursty exchange traffic
* or where queuing of throttled exchanges is undesirable.
*/
-public class SamplingThrottler extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class SamplingThrottler extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private long messageFrequency;
@@ -71,14 +69,6 @@ public class SamplingThrottler extends ServiceSupport implements AsyncProcessor,
}
@Override
- protected void doStart() throws Exception {
- }
-
- @Override
- protected void doStop() throws Exception {
- }
-
- @Override
public String toString() {
if (messageFrequency > 0) {
return "SamplingThrottler[1 exchange per: " + messageFrequency + " messages received]";
@@ -116,11 +106,6 @@ public class SamplingThrottler extends ServiceSupport implements AsyncProcessor,
}
@Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
boolean doSend = false;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ScriptProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ScriptProcessor.java
index 41850c2..436442b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ScriptProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ScriptProcessor.java
@@ -17,19 +17,17 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A processor which executes the script as an expression and does not change the message body.
*/
-public class ScriptProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class ScriptProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final Expression expression;
@@ -38,10 +36,6 @@ public class ScriptProcessor extends ServiceSupport implements AsyncProcessor, T
this.expression = expression;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
expression.evaluate(exchange, Object.class);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 1499e88..ece4662 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -17,7 +17,6 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
@@ -32,11 +31,10 @@ import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.SendDynamicAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.URISupport;
/**
@@ -44,7 +42,7 @@ import org.apache.camel.util.URISupport;
*
* @see org.apache.camel.processor.SendProcessor
*/
-public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
+public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAware, CamelContextAware {
protected SendDynamicAware dynamicAware;
protected CamelContext camelContext;
@@ -80,10 +78,6 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess
this.id = id;
}
- public void process(final Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, final AsyncCallback callback) {
if (!isStarted()) {
exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index b711ad7..48695cc 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -19,7 +19,6 @@ package org.apache.camel.processor;
import java.net.URISyntaxException;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.AsyncProducer;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
@@ -32,11 +31,10 @@ import org.apache.camel.impl.InterceptSendToEndpoint;
import org.apache.camel.impl.DefaultProducerCache;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.URISupport;
@@ -46,7 +44,7 @@ import org.apache.camel.util.URISupport;
*
* @see SendDynamicProcessor
*/
-public class SendProcessor extends ServiceSupport implements AsyncProcessor, Traceable, EndpointAware, IdAware {
+public class SendProcessor extends AsyncProcessorSupport implements Traceable, EndpointAware, IdAware {
protected transient String traceLabelToString;
protected final CamelContext camelContext;
@@ -101,10 +99,6 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra
return destination;
}
- public void process(final Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, final AsyncCallback callback) {
if (!isStarted()) {
exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
index 3253415..5481434 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
@@ -17,21 +17,19 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ServiceSupport;
/**
* A processor which sets the body on the IN or OUT message with an {@link Expression}
*/
-public class SetBodyProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class SetBodyProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final Expression expression;
@@ -39,10 +37,6 @@ public class SetBodyProcessor extends ServiceSupport implements AsyncProcessor,
this.expression = expression;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java
index 9c713ee..d4c674e 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java
@@ -17,20 +17,18 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A processor which sets the header on the IN or OUT message with an {@link org.apache.camel.Expression}
*/
-public class SetHeaderProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class SetHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final Expression headerName;
private final Expression expression;
@@ -42,10 +40,6 @@ public class SetHeaderProcessor extends ServiceSupport implements AsyncProcessor
ObjectHelper.notNull(expression, "expression");
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java
index 1ea4ffc..a059b45 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java
@@ -17,19 +17,17 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A processor which sets the property on the exchange with an {@link org.apache.camel.Expression}
*/
-public class SetPropertyProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class SetPropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final Expression propertyName;
private final Expression expression;
@@ -41,10 +39,6 @@ public class SetPropertyProcessor extends ServiceSupport implements AsyncProcess
ObjectHelper.notNull(expression, "expression");
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java
index 70a7369..2d2423c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java
@@ -20,18 +20,16 @@ import java.util.Comparator;
import java.util.List;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor that sorts the expression using a comparator
*/
-public class SortProcessor<T> extends ServiceSupport implements AsyncProcessor, IdAware, org.apache.camel.Traceable {
+public class SortProcessor<T> extends AsyncProcessorSupport implements IdAware, org.apache.camel.Traceable {
private String id;
private final Expression expression;
@@ -42,10 +40,6 @@ public class SortProcessor<T> extends ServiceSupport implements AsyncProcessor,
this.comparator = comparator;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java
index 66b6724..d74aaa8 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java
@@ -17,23 +17,17 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
/**
* Stops continue processing the route and marks it as complete.
*/
-public class StopProcessor extends ServiceSupport implements AsyncProcessor, IdAware {
+public class StopProcessor extends AsyncProcessorSupport implements IdAware {
private String id;
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
// mark the exchange to stop continue routing
exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
index aa57c60..67ac5eb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -24,7 +24,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
@@ -37,10 +36,9 @@ import org.apache.camel.processor.resequencer.SequenceElementComparator;
import org.apache.camel.processor.resequencer.SequenceSender;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
@@ -64,7 +62,7 @@ import org.apache.camel.util.ObjectHelper;
*
* @see ResequencerEngine
*/
-public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
+public class StreamResequencer extends AsyncProcessorSupport implements SequenceSender<Exchange>, Navigate<Processor>, Traceable, IdAware {
private String id;
private final CamelContext camelContext;
@@ -212,10 +210,6 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender<
processor.process(exchange);
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
while (engine.size() >= capacity) {
try {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
index 3ff9e8a..b08cae0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
@@ -22,13 +22,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.concurrent.Rejectable;
@@ -55,7 +53,7 @@ import org.apache.camel.util.concurrent.Rejectable;
* will not be free to process a new exchange, as its processing the current exchange.</li>
* </ul>
*/
-public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor, IdAware {
+public class ThreadsProcessor extends AsyncProcessorSupport implements IdAware {
private String id;
private final CamelContext camelContext;
@@ -111,10 +109,6 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
this.rejectedPolicy = rejectedPolicy;
}
- public void process(final Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
if (shutdown.get()) {
throw new IllegalStateException("ThreadsProcessor is not running.");
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index d713f92..ac1e391 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -27,15 +27,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
@@ -58,7 +56,7 @@ import org.apache.camel.util.ObjectHelper;
* callers point of view in the last timePeriodMillis no more than
* maxRequestsPerPeriod have been allowed to be acquired.
*/
-public class Throttler extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class Throttler extends AsyncProcessorSupport implements Traceable, IdAware {
private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey";
@@ -100,11 +98,6 @@ public class Throttler extends ServiceSupport implements AsyncProcessor, Traceab
}
@Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
long queuedStart = 0;
if (log.isTraceEnabled()) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java b/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
index 7414f4c..7828711 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
@@ -28,13 +28,14 @@ import org.apache.camel.Exchange;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.spi.CamelLogger;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A logger for logging message throughput.
*/
-public class ThroughputLogger extends ServiceSupport implements AsyncProcessor, IdAware {
+public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProcessor, IdAware {
private String id;
private final AtomicInteger receivedCounter = new AtomicInteger();
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java
index 5f248b5..858569b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java
@@ -19,7 +19,6 @@ package org.apache.camel.processor;
import java.lang.reflect.Constructor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
@@ -27,14 +26,13 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
-import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* The processor which sets an {@link Exception} on the {@link Exchange}
*/
-public class ThrowExceptionProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware, CamelContextAware {
+public class ThrowExceptionProcessor extends AsyncProcessorSupport implements Traceable, IdAware, CamelContextAware {
private String id;
private CamelContext camelContext;
private Expression simple;
@@ -52,10 +50,6 @@ public class ThrowExceptionProcessor extends ServiceSupport implements AsyncProc
this.message = message;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
Exception cause = exception;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java
index 8a71aa0..901deee 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java
@@ -17,22 +17,20 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A processor which sets the body on the OUT message with an {@link Expression}.
*/
-public class TransformProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+public class TransformProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
private String id;
private final Expression expression;
@@ -41,10 +39,6 @@ public class TransformProcessor extends ServiceSupport implements AsyncProcessor
this.expression = expression;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
Object newBody = expression.evaluate(exchange, Object.class);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
index e8178b1..e0d6ff4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -28,16 +28,15 @@ import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
/**
* Implements try/catch/finally type processing
*/
-public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
+public class TryProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
protected String id;
protected final Processor tryProcessor;
@@ -60,10 +59,6 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi
return "doTry";
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
ReactiveHelper.schedule(new TryState(exchange, callback));
diff --git a/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index 8904c4b..3229aeb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
-import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.ServiceHelper;
/**
@@ -49,7 +48,7 @@ public final class UnitOfWorkProducer implements Producer {
}
public void process(final Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(processor, exchange);
+ processor.process(exchange);
}
public void start() throws Exception {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java
index 98ad961..72485ea 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java
@@ -20,7 +20,6 @@ import java.io.InputStream;
import java.util.Iterator;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
@@ -29,9 +28,8 @@ import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
@@ -39,7 +37,7 @@ import org.apache.camel.util.ObjectHelper;
* Unmarshals the body of the incoming message using the given
* <a href="http://camel.apache.org/data-format.html">data format</a>
*/
-public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor, Traceable, CamelContextAware, IdAware {
+public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware {
private String id;
private CamelContext camelContext;
private final DataFormat dataFormat;
@@ -48,10 +46,6 @@ public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor
this.dataFormat = dataFormat;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
ObjectHelper.notNull(dataFormat, "dataFormat");
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 8d716a6..2282467 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.LongAdder;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
@@ -39,17 +38,16 @@ import org.apache.camel.Traceable;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ShutdownAware;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
* Processor for wire tapping exchanges to an endpoint destination.
*/
-public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, ShutdownAware, IdAware, CamelContextAware {
+public class WireTapProcessor extends AsyncProcessorSupport implements Traceable, ShutdownAware, IdAware, CamelContextAware {
private String id;
private CamelContext camelContext;
@@ -127,10 +125,6 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
return dynamicProcessor.getEndpointUtilizationStatistics();
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(final Exchange exchange, final AsyncCallback callback) {
if (!isStarted()) {
throw new IllegalStateException("WireTapProcessor has not been started: " + this);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 0039c49..54b4f2d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -35,7 +35,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
@@ -50,6 +49,7 @@ import org.apache.camel.ProducerTemplate;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.TimeoutMap;
import org.apache.camel.Traceable;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
@@ -58,13 +58,11 @@ import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.ShutdownPrepared;
import org.apache.camel.spi.Synchronization;
-import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.DefaultTimeoutMap;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LRUCacheFactory;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.TimeUtils;
@@ -84,7 +82,7 @@ import org.apache.camel.util.TimeUtils;
* and older prices are discarded). Another idea is to combine line item messages
* together into a single invoice message.
*/
-public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware {
+public class AggregateProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware {
public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
@@ -259,13 +257,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
this.id = id;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- doProcess(exchange);
+ doProcess(exchange, callback);
} catch (Throwable e) {
exchange.setException(e);
}
@@ -273,7 +267,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
return true;
}
- protected void doProcess(Exchange exchange) throws Exception {
+ protected void doProcess(Exchange exchange, AsyncCallback callback) throws Exception {
if (getStatistics().isStatisticsEnabled()) {
totalIn.incrementAndGet();
diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
index 3009fcf..953b241 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
@@ -28,13 +28,12 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
/**
* An implementation of the <a
@@ -44,7 +43,7 @@ import org.apache.camel.support.ServiceSupport;
*
* @see org.apache.camel.spi.IdempotentRepository
*/
-public class IdempotentConsumer extends ServiceSupport implements CamelContextAware, AsyncProcessor, Navigate<Processor>, IdAware {
+public class IdempotentConsumer extends AsyncProcessorSupport implements CamelContextAware, Navigate<Processor>, IdAware {
private CamelContext camelContext;
private String id;
@@ -91,10 +90,6 @@ public class IdempotentConsumer extends ServiceSupport implements CamelContextAw
this.id = id;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(final Exchange exchange, final AsyncCallback callback) {
final AsyncCallback target;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
index 83292bd..85eafb4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
@@ -20,15 +20,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.ServiceHelper;
-import org.apache.camel.support.ServiceSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A default base class for a {@link LoadBalancer} implementation.
@@ -38,9 +34,8 @@ import org.slf4j.LoggerFactory;
* Consider using the {@link SimpleLoadBalancerSupport} if your load balancer does not by nature
* support asynchronous routing.
*/
-public abstract class LoadBalancerSupport extends ServiceSupport implements LoadBalancer, Navigate<Processor>, IdAware {
+public abstract class LoadBalancerSupport extends AsyncProcessorSupport implements LoadBalancer, Navigate<Processor>, IdAware {
- protected final Logger log = LoggerFactory.getLogger(getClass());
private final List<Processor> processors = new CopyOnWriteArrayList<>();
private String id;
@@ -91,7 +86,4 @@ public abstract class LoadBalancerSupport extends ServiceSupport implements Load
}
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
index 3123556..b2704e1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
@@ -43,16 +43,13 @@ import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExpectedBodyTypeException;
import org.apache.camel.RuntimeTransformException;
import org.apache.camel.TypeConverter;
import org.apache.camel.converter.jaxp.XmlConverter;
-import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.camel.processor.validation.SchemaReader.ACCESS_EXTERNAL_DTD;
@@ -60,8 +57,8 @@ import static org.apache.camel.processor.validation.SchemaReader.ACCESS_EXTERNAL
* A processor which validates the XML version of the inbound message body
* against some schema either in XSD or RelaxNG
*/
-public class ValidatingProcessor implements AsyncProcessor {
- private static final Logger LOG = LoggerFactory.getLogger(ValidatingProcessor.class);
+public class ValidatingProcessor extends AsyncProcessorSupport {
+
private final SchemaReader schemaReader;
private ValidatorErrorHandler errorHandler = new DefaultValidationErrorHandler();
private final XmlConverter converter = new XmlConverter();
@@ -79,10 +76,6 @@ public class ValidatingProcessor implements AsyncProcessor {
this.schemaReader = schemaReader;
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
doProcess(exchange);
@@ -105,11 +98,11 @@ public class ValidatingProcessor implements AsyncProcessor {
// turn off access to external schema by default
if (!Boolean.parseBoolean(exchange.getContext().getGlobalOptions().get(ACCESS_EXTERNAL_DTD))) {
try {
- LOG.debug("Configuring Validator to not allow access to external DTD/Schema");
+ log.debug("Configuring Validator to not allow access to external DTD/Schema");
validator.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "");
validator.setProperty(XMLConstants.ACCESS_EXTERNAL_SCHEMA, "");
} catch (SAXException e) {
- LOG.warn(e.getMessage(), e);
+ log.warn(e.getMessage(), e);
}
}
@@ -158,7 +151,7 @@ public class ValidatingProcessor implements AsyncProcessor {
validator.setErrorHandler(handler);
try {
- LOG.trace("Validating {}", source);
+ log.trace("Validating {}", source);
validator.validate(source, result);
handler.handleErrors(exchange, schema, result);
} catch (SAXParseException e) {
diff --git a/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
similarity index 66%
copy from camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
copy to camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
index 83d4242..e75f4e1 100644
--- a/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
@@ -16,21 +16,24 @@
*/
package org.apache.camel.support;
-import org.apache.camel.AsyncProducer;
-import org.apache.camel.Endpoint;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
-/**
- * A default implementation of {@link org.apache.camel.Producer} for implementation inheritance,
- * which can process {@link Exchange}s asynchronously.
- */
-public abstract class DefaultAsyncProducer extends DefaultProducer implements AsyncProducer {
+public abstract class AsyncProcessorSupport extends ServiceSupport implements AsyncProcessor {
- public DefaultAsyncProducer(Endpoint endpoint) {
- super(endpoint);
+ @Override
+ protected void doStart() throws Exception {
}
+ @Override
+ protected void doStop() throws Exception {
+ }
+
+ @Override
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
+ AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
+ awaitManager.process(this, exchange);
}
+
}
diff --git a/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java b/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
index 83d4242..f566eb4 100644
--- a/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
@@ -19,6 +19,7 @@ package org.apache.camel.support;
import org.apache.camel.AsyncProducer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
/**
* A default implementation of {@link org.apache.camel.Producer} for implementation inheritance,
@@ -31,6 +32,7 @@ public abstract class DefaultAsyncProducer extends DefaultProducer implements As
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
+ AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
+ awaitManager.process(this, exchange);
}
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java b/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
index f5cfdd1..32f15eb 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
@@ -84,13 +84,7 @@ public class MDCAsyncTest extends ContextTestSupport {
@Override
public boolean process(Exchange exchange, final AsyncCallback callback) {
- EXECUTOR.submit(new Runnable() {
- @Override
- public void run() {
- callback.done(false);
- }
- });
-
+ EXECUTOR.submit(() -> callback.done(false));
return false;
}
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
index 9dea914..ba59bb1 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
@@ -24,10 +24,10 @@ import org.apache.camel.NamedNode;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorHelper;
import org.junit.Test;
public class AsyncEndpointPolicyTest extends ContextTestSupport {
@@ -121,31 +121,16 @@ public class AsyncEndpointPolicyTest extends ContextTestSupport {
// let the original processor continue routing
exchange.getIn().setHeader(name, "was wrapped");
AsyncProcessor ap = AsyncProcessorConverterHelper.convert(processor);
- boolean sync = ap.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // we only have to handle async completion of this policy
- if (doneSync) {
- return;
- }
-
- exchange.getIn().setHeader(name, "policy finished execution");
- callback.done(false);
- }
+ ap.process(exchange, doneSync -> {
+ exchange.getIn().setHeader(name, "policy finished execution");
+ callback.done(false);
});
-
- if (!sync) {
- // continue routing async
- return false;
- }
-
- // we are done synchronously, so do our after work and invoke the callback
- exchange.getIn().setHeader(name, "policy finished execution");
- callback.done(true);
- return true;
+ return false;
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
+ final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
+ awaitManager.process(this, exchange);
}
};
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoutingSlipBeanNonBlockingTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoutingSlipBeanNonBlockingTest.java
index da282e1..e438299 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoutingSlipBeanNonBlockingTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoutingSlipBeanNonBlockingTest.java
@@ -32,6 +32,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.support.AsyncProcessorHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ServiceHelper;
import org.junit.Assert;
import org.junit.Test;
@@ -113,7 +114,7 @@ public class AsyncEndpointRoutingSlipBeanNonBlockingTest extends ContextTestSupp
}
}
- private class MyAsyncProcessor implements AsyncProcessor {
+ private class MyAsyncProcessor extends AsyncProcessorSupport {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
@@ -122,10 +123,5 @@ public class AsyncEndpointRoutingSlipBeanNonBlockingTest extends ContextTestSupp
return false;
}
-
- @Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
}
}
\ No newline at end of file
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
index 79a7aa8..46035a5 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentMap;
import io.atomix.resource.Resource;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
import org.apache.camel.Exchange;
import org.apache.camel.InvokeOnHeader;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.atomix.AtomixAsyncMessageProcessor;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.util.ObjectHelper;
@@ -37,7 +39,7 @@ import org.apache.camel.util.ObjectHelper;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT;
import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
-public abstract class AbstractAtomixClientProducer<E extends AbstractAtomixClientEndpoint, R extends Resource> extends DefaultProducer implements AsyncProcessor {
+public abstract class AbstractAtomixClientProducer<E extends AbstractAtomixClientEndpoint, R extends Resource> extends DefaultAsyncProducer {
private final Map<String, AtomixAsyncMessageProcessor> processors;
private ConcurrentMap<String, R> resources;
@@ -64,11 +66,6 @@ public abstract class AbstractAtomixClientProducer<E extends AbstractAtomixClien
}
@Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
final Message message = exchange.getIn();
final String key = getProcessorKey(message);
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerContinuationTimeoutTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerContinuationTimeoutTest.java
index fea51bc..7c5f9db 100644
--- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerContinuationTimeoutTest.java
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerContinuationTimeoutTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.support.AsyncProcessorHelper;
import org.junit.Test;
@@ -61,12 +62,7 @@ public class CxfConsumerContinuationTimeoutTest extends CamelTestSupport {
.setBody(constant("Sensitive Data"))
.to(simpleEndpointURI + "&continuationTimeout=5000&dataFormat=RAW");
- from(simpleEndpointURI + "&continuationTimeout=5000&dataFormat=RAW").process(new AsyncProcessor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
+ from(simpleEndpointURI + "&continuationTimeout=5000&dataFormat=RAW").process(new AsyncProcessorSupport() {
@Override
public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
Message in = exchange.getIn();
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
index 10b3036..ca16176 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
@@ -25,21 +25,19 @@ import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.support.AsyncProcessorHelper;
/**
* Implementation of the Hystrix EIP.
*/
@ManagedResource(description = "Managed Hystrix Processor")
-public class HystrixProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, org.apache.camel.Traceable, IdAware {
+public class HystrixProcessor extends AsyncProcessorSupport implements Navigate<Processor>, org.apache.camel.Traceable, IdAware {
private String id;
private final HystrixCommandGroupKey groupKey;
@@ -185,11 +183,6 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
}
@Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// run this as if we run inside try .. catch so there is no regular Camel error handler
exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
index 148e5ee..ddf39da 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
@@ -20,8 +20,8 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.AsyncProcessorHelper;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -30,7 +30,7 @@ import org.reactivestreams.Subscription;
/**
* A Processor that converts a Publisher into its content asynchronously.
*/
-public class UnwrapStreamProcessor implements AsyncProcessor {
+public class UnwrapStreamProcessor extends AsyncProcessorSupport {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
@@ -97,9 +97,4 @@ public class UnwrapStreamProcessor implements AsyncProcessor {
return true;
}
- @Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
}
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
index f4acca7..6360368 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
@@ -30,6 +30,7 @@ import org.apache.camel.processor.RedeliveryErrorHandler;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
import org.apache.camel.spi.CamelLogger;
+import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.springframework.transaction.TransactionDefinition;
@@ -91,7 +92,7 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler {
}
@Override
- public void process(Exchange exchange) throws Exception {
+ public void process(Exchange exchange) {
// we have to run this synchronously as Spring Transaction does *not* support
// using multiple threads to span a transaction
if (transactionTemplate.getPropagationBehavior() != TransactionDefinition.PROPAGATION_REQUIRES_NEW && exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
@@ -120,7 +121,7 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler {
return true;
}
- protected void processInTransaction(final Exchange exchange) throws Exception {
+ protected void processInTransaction(final Exchange exchange) {
// is the exchange redelivered, for example JMS brokers support such details
Boolean externalRedelivered = exchange.isExternalRedelivered();
final String redelivered = externalRedelivered != null ? externalRedelivered.toString() : "unknown";
@@ -215,31 +216,12 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler {
* @param exchange the exchange
*/
protected void processByErrorHandler(final Exchange exchange) {
- final CountDownLatch latch = new CountDownLatch(1);
- boolean sync = super.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (!doneSync) {
- log.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
- latch.countDown();
- }
- }
-
+ awaitManager.process(new AsyncProcessorSupport() {
@Override
- public String toString() {
- return "Done " + TransactionErrorHandler.this.toString();
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ return TransactionErrorHandler.super.process(exchange, callback);
}
- });
- if (!sync) {
- log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
- exchange.getExchangeId(), exchange);
- try {
- latch.await();
- } catch (InterruptedException e) {
- exchange.setException(e);
- }
- log.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
- exchange.getExchangeId(), exchange);
- }
+ }, exchange);
}
/**