You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/11/22 16:33:24 UTC

[camel] 07/11: Add a method returning a CompletableFuture to AsyncProcessor

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2a79d6ca41853dc71dfe3f151f395e57d0d44f99
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Fri Nov 16 15:48:55 2018 +0100

    Add a method returning a CompletableFuture to AsyncProcessor
---
 camel-api/src/main/java/org/apache/camel/AsyncProcessor.java   |  5 +++++
 .../java/org/apache/camel/component/bean/BeanProcessor.java    |  7 +++++++
 .../src/main/java/org/apache/camel/impl/DeferProducer.java     |  9 +++++++++
 .../java/org/apache/camel/processor/ConvertBodyProcessor.java  | 10 ++++++++++
 .../java/org/apache/camel/processor/DelegateSyncProcessor.java |  9 +++++++++
 .../camel/processor/InterceptorToAsyncProcessorBridge.java     | 10 ++++++++++
 .../org/apache/camel/processor/RedeliveryErrorHandler.java     |  8 ++++++++
 .../apache/camel/processor/SharedCamelInternalProcessor.java   |  9 +++++++++
 .../apache/camel/support/AsyncProcessorConverterHelper.java    |  9 +++++++++
 .../java/org/apache/camel/support/AsyncProcessorSupport.java   | 10 ++++++++++
 .../java/org/apache/camel/support/DefaultAsyncProducer.java    |  9 +++++++++
 .../src/test/java/org/apache/camel/processor/MDCAsyncTest.java |  9 +++++++++
 .../apache/camel/processor/async/AsyncEndpointPolicyTest.java  |  9 +++++++++
 .../apache/camel/cdi/transaction/TransactionErrorHandler.java  |  9 +++++++++
 .../main/java/org/apache/camel/component/cxf/CxfProducer.java  |  3 ++-
 .../org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java    |  3 ++-
 16 files changed, 126 insertions(+), 2 deletions(-)

diff --git a/camel-api/src/main/java/org/apache/camel/AsyncProcessor.java b/camel-api/src/main/java/org/apache/camel/AsyncProcessor.java
index 27da13e..442a61c 100644
--- a/camel-api/src/main/java/org/apache/camel/AsyncProcessor.java
+++ b/camel-api/src/main/java/org/apache/camel/AsyncProcessor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * An <b>asynchronous</b> processor which can process an {@link Exchange} in an asynchronous fashion
  * and signal completion by invoking the {@link AsyncCallback}.
@@ -39,4 +41,7 @@ public interface AsyncProcessor extends Processor {
      * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
      */
     boolean process(Exchange exchange, AsyncCallback callback);
+
+    CompletableFuture<Exchange> processAsync(Exchange exchange);
+
 }
diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
index 7410508..17785c1 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.bean;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
@@ -53,6 +55,11 @@ public class BeanProcessor extends ServiceSupport implements AsyncProcessor {
         return delegate.process(exchange, callback);
     }
 
+    @Override
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        return delegate.processAsync(exchange);
+    }
+
     public Processor getProcessor() {
         return delegate.getProcessor();
     }
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
index c5cd048..6973015 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
@@ -70,6 +72,13 @@ public class DeferProducer extends org.apache.camel.support.ServiceSupport imple
     }
 
     @Override
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+        process(exchange, callback);
+        return callback.getFuture();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         // need to lookup endpoint again as it may be intercepted
         Endpoint lookup = endpoint.getCamelContext().getEndpoint(endpoint.getEndpointUri());
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
index 70b717d..fae3396 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
@@ -16,10 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.DefaultMessage;
@@ -115,6 +118,13 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
     }
 
     @Override
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+        process(exchange, callback);
+        return callback.getFuture();
+    }
+
+    @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
             process(exchange);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
index 963eec0..68a8bdf 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
@@ -18,12 +18,14 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.ServiceHelper;
 import org.apache.camel.support.ServiceSupport;
 
@@ -70,6 +72,13 @@ public class DelegateSyncProcessor extends ServiceSupport implements org.apache.
     }
 
     @Override
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+        process(exchange, callback);
+        return callback.getFuture();
+    }
+
+    @Override
     public void process(Exchange exchange) throws Exception {
         processor.process(exchange);
     }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java b/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
index 5ba23ed..ac9a976 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
@@ -16,10 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.ServiceHelper;
 import org.apache.camel.support.ServiceSupport;
@@ -76,6 +79,13 @@ public class InterceptorToAsyncProcessorBridge extends ServiceSupport implements
         }
     }
 
+    @Override
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+        process(exchange, callback);
+        return callback.getFuture();
+    }
+
     public void setTarget(Processor target) {
         this.target = AsyncProcessorConverterHelper.convert(target);
     }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 43bf18c..1a87121 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -34,6 +35,7 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.reifier.ErrorHandlerReifier;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
@@ -154,6 +156,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         return false;
     }
 
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+        process(exchange, callback);
+        return callback.getFuture();
+    }
+
     /**
      * Allows to change the output of the error handler which are used when optimising the
      * JMX instrumentation to use either an advice or wrapped processor when calling a processor.
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 9848a6b..4f004ef 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 
@@ -28,6 +29,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Ordered;
 import org.apache.camel.Processor;
 import org.apache.camel.Service;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.Transformer;
@@ -89,6 +91,13 @@ public class SharedCamelInternalProcessor {
             }
 
             @Override
+            public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+                AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+                process(exchange, callback);
+                return callback.getFuture();
+            }
+
+            @Override
             public void process(Exchange exchange) throws Exception {
                 throw new IllegalStateException();
             }
diff --git a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorConverterHelper.java b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorConverterHelper.java
index b332012..7640e04 100644
--- a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorConverterHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorConverterHelper.java
@@ -18,6 +18,7 @@ package org.apache.camel.support;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -29,6 +30,7 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.Service;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 
 /**
  * A simple converter that can convert any {@link Processor} to an {@link AsyncProcessor}.
@@ -71,6 +73,13 @@ public final class AsyncProcessorConverterHelper {
         }
 
         @Override
+        public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+            AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+            process(exchange, callback);
+            return callback.getFuture();
+        }
+
+        @Override
         public String toString() {
             if (processor != null) {
                 return processor.toString();
diff --git a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
index e75f4e1..b7b986f 100644
--- a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
@@ -16,8 +16,12 @@
  */
 package org.apache.camel.support;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 
 public abstract class AsyncProcessorSupport extends ServiceSupport implements AsyncProcessor {
@@ -36,4 +40,10 @@ public abstract class AsyncProcessorSupport extends ServiceSupport implements As
         awaitManager.process(this, exchange);
     }
 
+    @Override
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+        process(exchange, callback);
+        return callback.getFuture();
+    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java b/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
index f566eb4..0116468 100644
--- a/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
@@ -16,9 +16,12 @@
  */
 package org.apache.camel.support;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.camel.AsyncProducer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 
 /**
@@ -35,4 +38,10 @@ public abstract class DefaultAsyncProducer extends DefaultProducer implements As
         AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
         awaitManager.process(this, exchange);
     }
+
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+        process(exchange, callback);
+        return callback.getFuture();
+    }
 }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java b/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
index 32f15eb..0395f7f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MDCAsyncTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -27,6 +28,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.junit.Test;
 import org.slf4j.MDC;
 
@@ -83,6 +85,13 @@ public class MDCAsyncTest extends ContextTestSupport {
         }
 
         @Override
+        public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+            AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+            process(exchange, callback);
+            return callback.getFuture();
+        }
+
+        @Override
         public boolean process(Exchange exchange, final AsyncCallback callback) {
             EXECUTOR.submit(() -> callback.done(false));
             return false;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
index ba59bb1..ae8a793 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor.async;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ContextTestSupport;
@@ -23,6 +25,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.NamedNode;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.Policy;
@@ -132,6 +135,12 @@ public class AsyncEndpointPolicyTest extends ContextTestSupport {
                     final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
                     awaitManager.process(this, exchange);
                 }
+
+                public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+                    AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+                    process(exchange, callback);
+                    return callback.getFuture();
+                }
             };
         }
 
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
index 68e8f30..5b8b79e 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionErrorHandler.java
@@ -18,6 +18,7 @@ package org.apache.camel.cdi.transaction;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 
 import javax.transaction.TransactionRolledbackException;
@@ -30,6 +31,7 @@ import org.apache.camel.LoggingLevel;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.processor.ErrorHandlerSupport;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
 import org.apache.camel.spi.ShutdownPrepared;
@@ -116,6 +118,13 @@ public class TransactionErrorHandler extends ErrorHandlerSupport
         return true;
     }
 
+    @Override
+    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> callback = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+        process(exchange, callback);
+        return callback.getFuture();
+    }
+
     protected void processInTransaction(final Exchange exchange) throws Exception {
         // is the exchange redelivered, for example JMS brokers support such
         // details
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
index f4ff4d9..ff377fc 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
@@ -34,6 +34,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.support.ServiceHelper;
@@ -52,7 +53,7 @@ import org.apache.cxf.service.model.BindingOperationInfo;
  * client, and sends the request to a CXF to a server.  Any response will 
  * be bound to Camel exchange. 
  */
-public class CxfProducer extends DefaultProducer implements AsyncProcessor {
+public class CxfProducer extends DefaultAsyncProducer {
 
     private Client client;
     private CxfEndpoint endpoint;
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 2d8322d..9f001b3 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -47,6 +47,7 @@ import org.apache.camel.component.cxf.CxfEndpointUtils;
 import org.apache.camel.component.cxf.CxfOperationException;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
 import org.apache.camel.http.common.cookie.CookieHandler;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.LRUSoftCache;
@@ -64,7 +65,7 @@ import org.slf4j.LoggerFactory;
  * JAXRS client, it will turn the normal Object invocation to a RESTful request
  * according to resource annotation.  Any response will be bound to Camel exchange.
  */
-public class CxfRsProducer extends DefaultProducer implements AsyncProcessor {
+public class CxfRsProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(CxfRsProducer.class);