You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/11/22 16:33:24 UTC
[camel] 07/11: Add a method returning a CompletableFuture to
AsyncProcessor
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2a79d6ca41853dc71dfe3f151f395e57d0d44f99
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Fri Nov 16 15:48:55 2018 +0100
Add a method returning a CompletableFuture to AsyncProcessor
---
camel-api/src/main/java/org/apache/camel/AsyncProcessor.java | 5 +++++
.../java/org/apache/camel/component/bean/BeanProcessor.java | 7 +++++++
.../src/main/java/org/apache/camel/impl/DeferProducer.java | 9 +++++++++
.../java/org/apache/camel/processor/ConvertBodyProcessor.java | 10 ++++++++++
.../java/org/apache/camel/processor/DelegateSyncProcessor.java | 9 +++++++++
.../camel/processor/InterceptorToAsyncProcessorBridge.java | 10 ++++++++++
.../org/apache/camel/processor/RedeliveryErrorHandler.java | 8 ++++++++
.../apache/camel/processor/SharedCamelInternalProcessor.java | 9 +++++++++
.../apache/camel/support/AsyncProcessorConverterHelper.java | 9 +++++++++
.../java/org/apache/camel/support/AsyncProcessorSupport.java | 10 ++++++++++
.../java/org/apache/camel/support/DefaultAsyncProducer.java | 9 +++++++++
.../src/test/java/org/apache/camel/processor/MDCAsyncTest.java | 9 +++++++++
.../apache/camel/processor/async/AsyncEndpointPolicyTest.java | 9 +++++++++
.../apache/camel/cdi/transaction/TransactionErrorHandler.java | 9 +++++++++
.../main/java/org/apache/camel/component/cxf/CxfProducer.java | 3 ++-
.../org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java | 3 ++-
16 files changed, 126 insertions(+), 2 deletions(-)
diff --git a/camel-api/src/main/java/org/apache/camel/AsyncProcessor.java b/camel-api/src/main/java/org/apache/camel/AsyncProcessor.java
index 27da13e..442a61c 100644
--- a/camel-api/src/main/java/org/apache/camel/AsyncProcessor.java
+++ b/camel-api/src/main/java/org/apache/camel/AsyncProcessor.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel;
+import java.util.concurrent.CompletableFuture;
+
/**
* An <b>asynchronous</b> processor which can process an {@link Exchange} in an asynchronous fashion
* and signal completion by invoking the {@link AsyncCallback}.
@@ -39,4 +41,7 @@ public interface AsyncProcessor extends Processor {
* @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
*/
boolean process(Exchange exchange, AsyncCallback callback);
+
+ CompletableFuture<Exchange> processAsync(Exchange exchange);
+
}
diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
index 7410508..17785c1 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.bean;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
@@ -53,6 +55,11 @@ public class BeanProcessor extends ServiceSupport implements AsyncProcessor {
return delegate.process(exchange, callback);
}
+ @Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ return delegate.processAsync(exchange);
+ }
+
public Processor getProcessor() {
return delegate.getProcessor();
}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
index c5cd048..6973015 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
@@ -70,6 +72,13 @@ public class DeferProducer extends org.apache.camel.support.ServiceSupport imple
}
@Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
+ @Override
protected void doStart() throws Exception {
// need to lookup endpoint again as it may be intercepted
Endpoint lookup = endpoint.getCamelContext().getEndpoint(endpoint.getEndpointUri());
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
index 70b717d..fae3396 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
@@ -16,10 +16,13 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorHelper;
import org.apache.camel.support.DefaultMessage;
@@ -115,6 +118,13 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
}
@Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
+ @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
process(exchange);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
index 963eec0..68a8bdf 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
@@ -18,12 +18,14 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.support.ServiceHelper;
import org.apache.camel.support.ServiceSupport;
@@ -70,6 +72,13 @@ public class DelegateSyncProcessor extends ServiceSupport implements org.apache.
}
@Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
+ @Override
public void process(Exchange exchange) throws Exception {
processor.process(exchange);
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java b/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
index 5ba23ed..ac9a976 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
@@ -16,10 +16,13 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.ServiceHelper;
import org.apache.camel.support.ServiceSupport;
@@ -76,6 +79,13 @@ public class InterceptorToAsyncProcessorBridge extends ServiceSupport implements
}
}
+ @Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
public void setTarget(Processor target) {
this.target = AsyncProcessorConverterHelper.convert(target);
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 43bf18c..1a87121 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -34,6 +35,7 @@ import org.apache.camel.Navigate;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.reifier.ErrorHandlerReifier;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
@@ -154,6 +156,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
return false;
}
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
/**
* Allows to change the output of the error handler which are used when optimising the
* JMX instrumentation to use either an advice or wrapped processor when calling a processor.
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 9848a6b..4f004ef 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
@@ -28,6 +29,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Ordered;
import org.apache.camel.Processor;
import org.apache.camel.Service;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.Transformer;
@@ -89,6 +91,13 @@ public class SharedCamelInternalProcessor {
}
@Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
+ @Override
public void process(Exchange exchange) throws Exception {
throw new IllegalStateException();
}
diff --git a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorConverterHelper.java b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorConverterHelper.java
index b332012..7640e04 100644
--- a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorConverterHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorConverterHelper.java
@@ -18,6 +18,7 @@ package org.apache.camel.support;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -29,6 +30,7 @@ import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
/**
* A simple converter that can convert any {@link Processor} to an {@link AsyncProcessor}.
@@ -71,6 +73,13 @@ public final class AsyncProcessorConverterHelper {
}
@Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
+ @Override
public String toString() {
if (processor != null) {
return processor.toString();
diff --git a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
index e75f4e1..b7b986f 100644
--- a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
@@ -16,8 +16,12 @@
*/
package org.apache.camel.support;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
public abstract class AsyncProcessorSupport extends ServiceSupport implements AsyncProcessor {
@@ -36,4 +40,10 @@ public abstract class AsyncProcessorSupport extends ServiceSupport implements As
awaitManager.process(this, exchange);
}
+ @Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
}
diff --git a/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java b/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
index f566eb4..0116468 100644
--- a/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
@@ -16,9 +16,12 @@
*/
package org.apache.camel.support;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.camel.AsyncProducer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
/**
@@ -35,4 +38,10 @@ public abstract class DefaultAsyncProducer extends DefaultProducer implements As
AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
awaitManager.process(this, exchange);
}
+
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java b/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
index 32f15eb..0395f7f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -27,6 +28,7 @@ import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.junit.Test;
import org.slf4j.MDC;
@@ -83,6 +85,13 @@ public class MDCAsyncTest extends ContextTestSupport {
}
@Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
+ @Override
public boolean process(Exchange exchange, final AsyncCallback callback) {
EXECUTOR.submit(() -> callback.done(false));
return false;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
index ba59bb1..ae8a793 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor.async;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.ContextTestSupport;
@@ -23,6 +25,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.NamedNode;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.Policy;
@@ -132,6 +135,12 @@ public class AsyncEndpointPolicyTest extends ContextTestSupport {
final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
awaitManager.process(this, exchange);
}
+
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
};
}
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
index 68e8f30..5b8b79e 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
@@ -18,6 +18,7 @@ package org.apache.camel.cdi.transaction;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import javax.transaction.TransactionRolledbackException;
@@ -30,6 +31,7 @@ import org.apache.camel.LoggingLevel;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.processor.ErrorHandlerSupport;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
import org.apache.camel.spi.ShutdownPrepared;
@@ -116,6 +118,13 @@ public class TransactionErrorHandler extends ErrorHandlerSupport
return true;
}
+ @Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
+ }
+
protected void processInTransaction(final Exchange exchange) throws Exception {
// is the exchange redelivered, for example JMS brokers support such
// details
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
index f4ff4d9..ff377fc 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
@@ -34,6 +34,7 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.cxf.common.message.CxfConstants;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.support.ServiceHelper;
@@ -52,7 +53,7 @@ import org.apache.cxf.service.model.BindingOperationInfo;
* client, and sends the request to a CXF to a server. Any response will
* be bound to Camel exchange.
*/
-public class CxfProducer extends DefaultProducer implements AsyncProcessor {
+public class CxfProducer extends DefaultAsyncProducer {
private Client client;
private CxfEndpoint endpoint;
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 2d8322d..9f001b3 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -47,6 +47,7 @@ import org.apache.camel.component.cxf.CxfEndpointUtils;
import org.apache.camel.component.cxf.CxfOperationException;
import org.apache.camel.component.cxf.common.message.CxfConstants;
import org.apache.camel.http.common.cookie.CookieHandler;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LRUSoftCache;
@@ -64,7 +65,7 @@ import org.slf4j.LoggerFactory;
* JAXRS client, it will turn the normal Object invocation to a RESTful request
* according to resource annotation. Any response will be bound to Camel exchange.
*/
-public class CxfRsProducer extends DefaultProducer implements AsyncProcessor {
+public class CxfRsProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(CxfRsProducer.class);