You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/01/31 10:34:39 UTC

[3/3] camel git commit: CAMEL-10612: adding backpressure strategy as endpoint option

CAMEL-10612: adding backpressure strategy as endpoint option


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b64250b0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b64250b0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b64250b0

Branch: refs/heads/master
Commit: b64250b0fd349b293f0081e1f21a37fc8940e017
Parents: 69d5764
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Mon Jan 30 16:50:55 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Jan 31 11:33:57 2017 +0100

----------------------------------------------------------------------
 .../main/docs/reactive-streams-component.adoc   |  3 +-
 .../streams/ReactiveStreamsConsumer.java        |  9 ++++-
 .../streams/ReactiveStreamsEndpoint.java        | 16 ++++++++
 .../streams/ReactiveStreamsProducer.java        | 13 ++++++
 .../api/CamelReactiveStreamsService.java        | 26 ++++++++++--
 .../reactive/streams/engine/CamelPublisher.java | 26 ++++++++++++
 .../engine/CamelReactiveStreamsServiceImpl.java | 15 ++++++-
 .../streams/engine/CamelSubscriber.java         |  2 +-
 .../streams/engine/CamelSubscription.java       |  6 ++-
 .../streams/BackpressureStrategyTest.java       | 42 ++++++++++++++++++++
 .../reactive/streams/BasicPublisherTest.java    | 17 ++++++++
 .../support/ReactiveStreamsTestService.java     | 15 ++++++-
 12 files changed, 177 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 747b95b..6d04b2c 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -55,7 +55,7 @@ The Reactive Streams component supports 2 options which are listed below.
 
 
 // endpoint options: START
-The Reactive Streams component supports 8 endpoint options which are listed below:
+The Reactive Streams component supports 9 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -68,6 +68,7 @@ The Reactive Streams component supports 8 endpoint options which are listed belo
 | maxInflightExchanges | consumer | 128 | Integer | Maximum number of exchanges concurrently being processed by Camel. This parameter controls backpressure on the stream. Setting a non-positive value will disable backpressure.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.
+| backpressureStrategy | producer |  | ReactiveStreamsBackpressureStrategy | The backpressure strategy to use when pushing events to a slow subscriber.
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
 |=======================================================================
 {% endraw %}

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 081bbf4..2f1bdb7 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -22,6 +22,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,8 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private ExecutorService executor;
 
+    private CamelReactiveStreamsService service;
+
     public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -46,18 +49,20 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
+        this.service = CamelReactiveStreams.get(endpoint.getCamelContext(), endpoint.getServiceName());
+
         int poolSize = endpoint.getConcurrentConsumers();
         if (executor == null) {
             executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
         }
 
-        CamelReactiveStreams.get(endpoint.getCamelContext(), endpoint.getServiceName()).attachConsumer(endpoint.getStream(), this);
+        this.service.attachCamelConsumer(endpoint.getStream(), this);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        CamelReactiveStreams.get(endpoint.getCamelContext(), endpoint.getServiceName()).detachConsumer(endpoint.getStream());
+        this.service.detachCamelConsumer(endpoint.getStream());
 
         if (executor != null) {
             endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
index 07a74e3..ca1f993 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
@@ -43,6 +43,9 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
     @UriParam
     private String serviceName;
 
+    @UriParam(label = "producer")
+    private ReactiveStreamsBackpressureStrategy backpressureStrategy;
+
     public ReactiveStreamsEndpoint(String endpointUri, ReactiveStreamsComponent component) {
         super(endpointUri, component);
     }
@@ -107,4 +110,17 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
     public void setServiceName(String serviceName) {
         this.serviceName = serviceName;
     }
+
+    public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
+        return backpressureStrategy;
+    }
+
+    /**
+     * The backpressure strategy to use when pushing events to a slow subscriber.
+     */
+    public void setBackpressureStrategy(ReactiveStreamsBackpressureStrategy backpressureStrategy) {
+        this.backpressureStrategy = backpressureStrategy;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
index 824f18d..4c75789 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
@@ -55,6 +55,19 @@ public class ReactiveStreamsProducer extends DefaultAsyncProducer {
     protected void doStart() throws Exception {
         super.doStart();
         this.service = CamelReactiveStreams.get(endpoint.getCamelContext(), endpoint.getServiceName());
+        this.service.attachCamelProducer(endpoint.getStream(), this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        this.service.detachCamelProducer(endpoint.getStream());
+    }
+
+    @Override
+    public ReactiveStreamsEndpoint getEndpoint() {
+        return endpoint;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 57c635e..b865a48 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -20,6 +20,7 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Service;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
@@ -77,10 +78,27 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
     <T> Subscriber<T> getSubscriber(String name, Class<T> type);
 
     /*
-     * Methods for producers.
+     * Methods for Camel producers.
      */
 
     /**
+     * Used by Camel to associate the publisher of the stream with the given name to a specific Camel producer.
+     * This method is used to bind a Camel route to a reactive stream.
+     *
+     * @param name the stream name
+     * @param producer the producer of the route
+     * @throws IllegalStateException if another producer is already associated with the given stream name
+     */
+    void attachCamelProducer(String name, ReactiveStreamsProducer producer);
+
+    /**
+     * Used by Camel to detach the existing producer from the given stream.
+     *
+     * @param name the stream name
+     */
+    void detachCamelProducer(String name);
+
+    /**
      * Used by Camel to send the exchange to all active subscriptions on the given stream.
      * The callback is used to signal that the exchange has been delivered to the subscribers.
      *
@@ -91,7 +109,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
     void process(String name, Exchange exchange, DispatchCallback<Exchange> callback);
 
     /*
-     * Methods for consumers.
+     * Methods for Camel consumers.
      */
 
     /**
@@ -102,13 +120,13 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param consumer the consumer of the route
      * @throws IllegalStateException if another consumer is already associated with the given stream name
      */
-    void attachConsumer(String name, ReactiveStreamsConsumer consumer);
+    void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
 
     /**
      * Used by Camel to detach the existing consumer from the given stream.
      *
      * @param name the stream name
      */
-    void detachConsumer(String name);
+    void detachCamelConsumer(String name);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index 6a30625..5cafcd2 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -28,6 +28,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsEndpoint;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
@@ -49,6 +51,8 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
 
     private List<CamelSubscription> subscriptions = new CopyOnWriteArrayList<>();
 
+    private ReactiveStreamsProducer producer;
+
     public CamelPublisher(ExecutorService workerPool, CamelContext context, String name) {
         this.workerPool = workerPool;
         this.backpressureStrategy = ((ReactiveStreamsComponent) context.getComponent("reactive-streams")).getBackpressureStrategy();
@@ -98,6 +102,28 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
         }
     }
 
+
+    public void attachProducer(ReactiveStreamsProducer producer) {
+        Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
+        if (this.producer != null) {
+            throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
+        }
+        this.producer = producer;
+
+        // Apply endpoint options if available
+        ReactiveStreamsEndpoint endpoint = producer.getEndpoint();
+        if (endpoint.getBackpressureStrategy() != null) {
+            this.backpressureStrategy = endpoint.getBackpressureStrategy();
+            for (CamelSubscription sub : this.subscriptions) {
+                sub.setBackpressureStrategy(endpoint.getBackpressureStrategy());
+            }
+        }
+    }
+
+    public void detachProducer() {
+        this.producer = null;
+    }
+
     @Override
     public void close() throws Exception {
         for (CamelSubscription sub : subscriptions) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index ef617be..7b285a6 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
@@ -112,16 +113,26 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public void attachConsumer(String name, ReactiveStreamsConsumer consumer) {
+    public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
         getSubscriber(name).attachConsumer(consumer);
     }
 
     @Override
-    public void detachConsumer(String name) {
+    public void detachCamelConsumer(String name) {
         getSubscriber(name).detachConsumer();
     }
 
     @Override
+    public void attachCamelProducer(String name, ReactiveStreamsProducer producer) {
+        getPayloadPublisher(name).attachProducer(producer);
+    }
+
+    @Override
+    public void detachCamelProducer(String name) {
+        getPayloadPublisher(name).detachProducer();
+    }
+
+    @Override
     public void setCamelContext(CamelContext camelContext) {
         this.context = camelContext;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 3b7e5d5..89f5afc 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -57,7 +57,7 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
     public void attachConsumer(ReactiveStreamsConsumer consumer) {
         synchronized (this) {
             if (this.consumer != null) {
-                throw new IllegalStateException("A consumer is already attached on stream '" + name + "'");
+                throw new IllegalStateException("A consumer is already attached to the stream '" + name + "'");
             }
             this.consumer = consumer;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index e3489ef..d5f4337 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -248,5 +248,9 @@ public class CamelSubscription implements Subscription {
         checkAndFlush();
     }
 
-
+    public void setBackpressureStrategy(ReactiveStreamsBackpressureStrategy backpressureStrategy) {
+        mutex.lock();
+        this.backpressureStrategy = backpressureStrategy;
+        mutex.unlock();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
index 8846cd1..32e9cab 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
@@ -157,6 +157,48 @@ public class BackpressureStrategyTest extends CamelTestSupport {
         subscriber.cancel();
     }
 
+    @Test
+    public void testBackpressureDropStrategyInEndpoint() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:gen?period=20&repeatCount=20")
+                        .setBody().header(Exchange.TIMER_COUNTER)
+                        .to("reactive-streams:integers?backpressureStrategy=DROP");
+            }
+        }.addRoutesToCamelContext(context);
+
+
+        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch latch2 = new CountDownLatch(2);
+
+        TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>() {
+            @Override
+            public void onNext(Integer o) {
+                queue.add(o);
+                latch.countDown();
+                latch2.countDown();
+            }
+        };
+        subscriber.setInitiallyRequested(1);
+        CamelReactiveStreams.get(context).getPublisher("integers", Integer.class).subscribe(subscriber);
+
+        context().start();
+
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+        Thread.sleep(1000); // wait for all numbers to be generated
+
+        subscriber.request(19);
+        assertTrue(latch2.await(1, TimeUnit.SECONDS));
+        Thread.sleep(200); // add other time to ensure no other items arrive
+        assertEquals(2, queue.size());
+        int sum = queue.stream().reduce((i, j) -> i + j).get();
+        assertEquals(3, sum); // 1 + 2 = 3
+
+        subscriber.cancel();
+    }
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
index 84b15de..11910dc 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
@@ -104,6 +104,23 @@ public class BasicPublisherTest extends CamelTestSupport {
         disp3.dispose();
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testOnlyOneCamelProducerPerPublisher() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:one")
+                        .to("reactive-streams:stream");
+
+                from("direct:two")
+                        .to("reactive-streams:stream");
+            }
+        }.addRoutesToCamelContext(context);
+
+        context.start();
+    }
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index b46c9ff..0301757 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.reactive.streams.support;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.reactivestreams.Publisher;
@@ -84,12 +85,22 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
     }
 
     @Override
-    public void attachConsumer(String name, ReactiveStreamsConsumer consumer) {
+    public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
 
     }
 
     @Override
-    public void detachConsumer(String name) {
+    public void detachCamelConsumer(String name) {
+
+    }
+
+    @Override
+    public void attachCamelProducer(String name, ReactiveStreamsProducer producer) {
+
+    }
+
+    @Override
+    public void detachCamelProducer(String name) {
 
     }