You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/01/31 10:34:37 UTC

[1/3] camel git commit: CAMEL-10612: forwarding lifecycle event

Repository: camel
Updated Branches:
  refs/heads/master 69d57640e -> ce80e9bba


CAMEL-10612: forwarding lifecycle event


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce80e9bb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce80e9bb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce80e9bb

Branch: refs/heads/master
Commit: ce80e9bbad2e5a44ab1b5aa1143c2f67b5a7c79b
Parents: 8f36026
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Tue Jan 31 11:33:49 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Jan 31 11:33:57 2017 +0100

----------------------------------------------------------------------
 .../main/docs/reactive-streams-component.adoc   |   4 +-
 .../streams/ReactiveStreamsConstants.java       |  35 ++++
 .../streams/ReactiveStreamsConsumer.java        |  26 +++
 .../streams/ReactiveStreamsEndpoint.java        |  32 +++-
 .../streams/engine/CamelSubscriber.java         |  15 ++
 .../reactive/streams/EventTypeTest.java         | 181 +++++++++++++++++++
 6 files changed, 290 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ce80e9bb/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 715e2e6..7715b61 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -55,7 +55,7 @@ The Reactive Streams component supports 2 options which are listed below.
 
 
 // endpoint options: START
-The Reactive Streams component supports 9 endpoint options which are listed below:
+The Reactive Streams component supports 11 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -65,6 +65,8 @@ The Reactive Streams component supports 9 endpoint options which are listed belo
 | serviceName | common |  | String | Allows using an alternative CamelReactiveStreamService implementation. The implementation is looked up from the registry.
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | concurrentConsumers | consumer | 1 | int | Number of threads used to process exchanges in the Camel route.
+| forwardOnComplete | consumer | false | boolean | Determines if onComplete events should be pushed to the Camel route.
+| forwardOnError | consumer | false | boolean | Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body.
 | maxInflightExchanges | consumer | 128 | Integer | Maximum number of exchanges concurrently being processed by Camel. This parameter controls backpressure on the stream. Setting a non-positive value will disable backpressure.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.

http://git-wip-us.apache.org/repos/asf/camel/blob/ce80e9bb/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConstants.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConstants.java
new file mode 100644
index 0000000..9e17aff
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConstants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+/**
+ * Useful constants used in the Camel Reactive Streams component.
+ */
+public final class ReactiveStreamsConstants {
+
+    /**
+     * Every exchange consumed by Camel has this header set to indicate if the exchange
+     * contains an item (value="onNext"), an error (value="onError") or a completion event (value="onComplete").
+     * Errors and completion notification are not forwarded by default.
+     */
+    public static final String REACTIVE_STREAMS_EVENT_TYPE = "CamelReactiveStreamsEventType";
+
+
+    private ReactiveStreamsConstants() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce80e9bb/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 2f1bdb7..190fa6c 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -71,6 +71,32 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onNext");
+        return doSend(exchange, callback);
+    }
+
+    public void onComplete() {
+        if (endpoint.isForwardOnComplete()) {
+            Exchange exchange = endpoint.createExchange();
+            exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onComplete");
+
+            doSend(exchange, done -> {
+            });
+        }
+    }
+
+    public void onError(Throwable error) {
+        if (endpoint.isForwardOnError()) {
+            Exchange exchange = endpoint.createExchange();
+            exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onError");
+            exchange.getIn().setBody(error);
+
+            doSend(exchange, done -> {
+            });
+        }
+    }
+
+    private boolean doSend(Exchange exchange, AsyncCallback callback) {
         ExecutorService executorService = this.executor;
         if (executorService != null && this.isRunAllowed()) {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ce80e9bb/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
index ca1f993..9f1d919 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
@@ -34,14 +34,20 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
     @UriPath
     private String stream;
 
+    @UriParam
+    private String serviceName;
+
     @UriParam(label = "consumer", defaultValue = "128")
     private Integer maxInflightExchanges = 128;
 
     @UriParam(label = "consumer", defaultValue = "1")
     private int concurrentConsumers = 1;
 
-    @UriParam
-    private String serviceName;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean forwardOnComplete;
+
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean forwardOnError;
 
     @UriParam(label = "producer")
     private ReactiveStreamsBackpressureStrategy backpressureStrategy;
@@ -122,5 +128,27 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
         this.backpressureStrategy = backpressureStrategy;
     }
 
+    public boolean isForwardOnComplete() {
+        return forwardOnComplete;
+    }
+
+    /**
+     * Determines if onComplete events should be pushed to the Camel route.
+     */
+    public void setForwardOnComplete(boolean forwardOnComplete) {
+        this.forwardOnComplete = forwardOnComplete;
+    }
+
+    public boolean isForwardOnError() {
+        return forwardOnError;
+    }
+
+    /**
+     * Determines if onError events should be pushed to the Camel route.
+     * Exceptions will be set as message body.
+     */
+    public void setForwardOnError(boolean forwardOnError) {
+        this.forwardOnError = forwardOnError;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ce80e9bb/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 89f5afc..eda2863 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -149,17 +149,32 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
         }
 
         LOG.error("Error in reactive stream '" + name + "'", throwable);
+
+        ReactiveStreamsConsumer consumer;
         synchronized (this) {
+            consumer = this.consumer;
             this.subscription = null;
         }
+
+        if (consumer != null) {
+            consumer.onError(throwable);
+        }
+
     }
 
     @Override
     public void onComplete() {
         LOG.info("Reactive stream '{}' completed", name);
+
+        ReactiveStreamsConsumer consumer;
         synchronized (this) {
+            consumer = this.consumer;
             this.subscription = null;
         }
+
+        if (consumer != null) {
+            consumer.onComplete();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/ce80e9bb/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java
new file mode 100644
index 0000000..bb69211
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+
+
+public class EventTypeTest extends CamelTestSupport {
+
+    @Test
+    public void testOnCompleteHeaderForwarded() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:numbers?forwardOnComplete=true")
+                        .to("mock:endpoint");
+            }
+        }.addRoutesToCamelContext(context);
+
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+
+        context.start();
+
+        Flowable.<Integer>empty()
+                .subscribe(numbers);
+
+
+        MockEndpoint endpoint = getMockEndpoint("mock:endpoint");
+        endpoint.expectedMessageCount(1);
+        endpoint.expectedHeaderReceived(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onComplete");
+        endpoint.expectedBodiesReceived(new Object[]{null});
+        endpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testOnCompleteHeaderNotForwarded() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:numbers")
+                        .to("mock:endpoint");
+            }
+        }.addRoutesToCamelContext(context);
+
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+
+        context.start();
+
+        Flowable.<Integer>empty()
+                .subscribe(numbers);
+
+
+        MockEndpoint endpoint = getMockEndpoint("mock:endpoint");
+        endpoint.expectedMessageCount(0);
+        endpoint.assertIsSatisfied(200);
+    }
+
+    @Test
+    public void testOnNextHeaderForwarded() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:numbers")
+                        .to("mock:endpoint");
+            }
+        }.addRoutesToCamelContext(context);
+
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+
+        context.start();
+
+        Flowable.just(1)
+                .subscribe(numbers);
+
+        MockEndpoint endpoint = getMockEndpoint("mock:endpoint");
+        endpoint.expectedHeaderReceived(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onNext");
+        endpoint.expectedMessageCount(1);
+        endpoint.assertIsSatisfied();
+
+        Exchange ex = endpoint.getExchanges().get(0);
+        assertEquals(1, ex.getIn().getBody());
+    }
+
+    @Test
+    public void testOnErrorHeaderForwarded() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:numbers?forwardOnError=true")
+                        .to("mock:endpoint");
+            }
+        }.addRoutesToCamelContext(context);
+
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+
+        context.start();
+
+        RuntimeException ex = new RuntimeException("1");
+
+        Flowable.just(1)
+                .map(n -> {
+                    if (n == 1) {
+                        throw ex;
+                    }
+                    return n;
+                })
+                .subscribe(numbers);
+
+
+        MockEndpoint endpoint = getMockEndpoint("mock:endpoint");
+        endpoint.expectedMessageCount(1);
+        endpoint.expectedHeaderReceived(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onError");
+        endpoint.assertIsSatisfied();
+
+        Exchange exch = endpoint.getExchanges().get(0);
+        assertEquals(ex, exch.getIn().getBody());
+    }
+
+    @Test
+    public void testOnErrorHeaderNotForwarded() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:numbers")
+                        .to("mock:endpoint");
+            }
+        }.addRoutesToCamelContext(context);
+
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+
+        context.start();
+
+        RuntimeException ex = new RuntimeException("1");
+
+        Flowable.just(1)
+                .map(n -> {
+                    if (n == 1) {
+                        throw ex;
+                    }
+                    return n;
+                })
+                .subscribe(numbers);
+
+
+        MockEndpoint endpoint = getMockEndpoint("mock:endpoint");
+        endpoint.expectedMessageCount(0);
+        endpoint.assertIsSatisfied(200);
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+}


[2/3] camel git commit: CAMEL-10612: refactoring backpressure strategy

Posted by nf...@apache.org.
CAMEL-10612: refactoring backpressure strategy


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f360260
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f360260
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f360260

Branch: refs/heads/master
Commit: 8f36026009c6f101e81b82010779591b2e73b5fe
Parents: b64250b
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Mon Jan 30 17:40:31 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Jan 31 11:33:57 2017 +0100

----------------------------------------------------------------------
 .../main/docs/reactive-streams-component.adoc   | 11 +++++
 .../ReactiveStreamsBackpressureStrategy.java    | 49 ++++++++++++++++++--
 .../streams/engine/CamelSubscription.java       | 45 +++++++-----------
 3 files changed, 75 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8f360260/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 6d04b2c..715e2e6 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -193,6 +193,17 @@ In other circumstances, eg. when using a `http` consumer, the route suspension m
 using the default configuration (no policy, unbounded buffer) should be preferable. Users should try to avoid memory issues
 by limiting the number of requests to the http service (eg. scaling out).
 
+In contexts where a certain amount of data loss is acceptable, setting a backpressure strategy other than `BUFFER` can
+ be a solution for dealing with fast sources.
+
+[source,java]
+---------------------------------------------------------
+from("direct:thermostat")
+.to("reactive-streams:flow?backpressureStrategy=LATEST");
+---------------------------------------------------------
+
+When the `LATEST` backpressure strategy is used, only the last exchange received from the route is kept by the publisher, while older data is discarded (other options are available).
+
 ### Controlling Backpressure (consumer side)
 
 When Camel consumes items from a reactive-streams publisher, the maximum number of inflight exchanges can be set as endpoint option.

http://git-wip-us.apache.org/repos/asf/camel/blob/8f360260/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
index 823a7b8..db46915 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
@@ -16,6 +16,11 @@
  */
 package org.apache.camel.component.reactive.streams;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+
 /**
  * A list of possible backpressure strategy to use when the emission of upstream items cannot respect backpressure.
  */
@@ -24,17 +29,55 @@ public enum ReactiveStreamsBackpressureStrategy {
     /**
      * Buffers <em>all</em> onNext values until the downstream consumes it.
      */
-    BUFFER,
+    BUFFER {
+        @Override
+        public <T> Collection<T> update(Deque<T> buffer, T element) {
+            buffer.addLast(element);
+            return Collections.emptySet();
+        }
+    },
 
     /**
      * Drops the most recent onNext value if the downstream can't keep up.
      */
-    DROP,
+    DROP {
+        @Override
+        public <T> Collection<T> update(Deque<T> buffer, T element) {
+            if (buffer.size() > 0) {
+                return Arrays.asList(element);
+            } else {
+                buffer.addLast(element);
+                return Collections.emptySet();
+            }
+        }
+    },
 
     /**
      * Keeps only the latest onNext value, overwriting any previous value if the
      * downstream can't keep up.
      */
-    LATEST
+    LATEST {
+        @Override
+        public <T> Collection<T> update(Deque<T> buffer, T element) {
+            Collection<T> discarded = Collections.emptySet();
+            if (buffer.size() > 0) {
+                discarded = Arrays.asList(buffer.removeFirst());
+            }
+
+            buffer.addLast(element);
+            return discarded;
+        }
+    };
+
+
+    /**
+     * Updates the buffer and returns a list of discarded elements (if any).
+     *
+     * @param buffer the buffer to update
+     * @param element the elment that should possibly be inserted
+     * @param <T> the generic type of the element
+     * @return the list of discarded elements
+     */
+    public abstract <T> Collection<T> update(Deque<T> buffer, T element);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8f360260/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index d5f4337..16c4eea 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -16,8 +16,12 @@
  */
 package org.apache.camel.component.reactive.streams.engine;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -205,44 +209,31 @@ public class CamelSubscription implements Subscription {
     }
 
     public void publish(StreamPayload<Exchange> message) {
-        StreamPayload<Exchange> discardedMessage = null;
-        String discardReason = null;
+        Map<StreamPayload<Exchange>, String> discardedMessages = null;
         try {
             mutex.lock();
             if (!this.terminating && !this.terminated) {
-                if (this.backpressureStrategy == ReactiveStreamsBackpressureStrategy.BUFFER) {
-                    buffer.addLast(message);
-                } else if (this.backpressureStrategy == ReactiveStreamsBackpressureStrategy.DROP) {
-                    if (buffer.size() > 0) {
-                        LOG.warn("Exchange " + message.getItem() + " dropped according to the backpressure strategy " + ReactiveStreamsBackpressureStrategy.DROP);
-                        discardedMessage = message;
-                        discardReason = "the backpressure strategy (DROP) does not allow buffering";
-                    } else {
-                        buffer.addLast(message);
+                Collection<StreamPayload<Exchange>> discarded = this.backpressureStrategy.update(buffer, message);
+                if (discarded.iterator().hasNext()) {
+                    discardedMessages = new HashMap<>();
+                    for (StreamPayload<Exchange> ex : discarded) {
+                        discardedMessages.put(ex, "Exchange " + ex.getItem() + " discarded by backpressure strategy " + this.backpressureStrategy);
                     }
-                } else if (this.backpressureStrategy == ReactiveStreamsBackpressureStrategy.LATEST) {
-                    if (buffer.size() > 0) {
-                        StreamPayload<Exchange> older = buffer.removeFirst();
-                        LOG.warn("Exchange " + message.getItem() + " dropped according to the backpressure strategy " + ReactiveStreamsBackpressureStrategy.LATEST);
-                        discardedMessage = older;
-                        discardReason = "the backpressure strategy (LATEST) does not allow buffering";
-                    }
-                    buffer.addLast(message);
-                } else {
-                    throw new IllegalStateException("Unsupported backpressure strategy: " + this.backpressureStrategy);
                 }
-
             } else {
-                discardedMessage = message;
-                discardReason = "subscription closed";
+                // acknowledge
+                discardedMessages = Collections.singletonMap(message, "Exchange " + message.getItem() + " discarded: subscription closed");
             }
         } finally {
             mutex.unlock();
         }
 
-        if (discardedMessage != null) {
-            // acknowledge
-            discardedMessage.getCallback().processed(message.getItem(), new IllegalStateException("Exchange discarded: " + discardReason));
+        // discarding outside of mutex scope
+        if (discardedMessages != null) {
+            for (Map.Entry<StreamPayload<Exchange>, String> discarded : discardedMessages.entrySet()) {
+                StreamPayload<Exchange> m = discarded.getKey();
+                m.getCallback().processed(m.getItem(), new IllegalStateException(discarded.getValue()));
+            }
         }
 
         checkAndFlush();


[3/3] camel git commit: CAMEL-10612: adding backpressure strategy as endpoint option

Posted by nf...@apache.org.
CAMEL-10612: adding backpressure strategy as endpoint option


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b64250b0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b64250b0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b64250b0

Branch: refs/heads/master
Commit: b64250b0fd349b293f0081e1f21a37fc8940e017
Parents: 69d5764
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Mon Jan 30 16:50:55 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Jan 31 11:33:57 2017 +0100

----------------------------------------------------------------------
 .../main/docs/reactive-streams-component.adoc   |  3 +-
 .../streams/ReactiveStreamsConsumer.java        |  9 ++++-
 .../streams/ReactiveStreamsEndpoint.java        | 16 ++++++++
 .../streams/ReactiveStreamsProducer.java        | 13 ++++++
 .../api/CamelReactiveStreamsService.java        | 26 ++++++++++--
 .../reactive/streams/engine/CamelPublisher.java | 26 ++++++++++++
 .../engine/CamelReactiveStreamsServiceImpl.java | 15 ++++++-
 .../streams/engine/CamelSubscriber.java         |  2 +-
 .../streams/engine/CamelSubscription.java       |  6 ++-
 .../streams/BackpressureStrategyTest.java       | 42 ++++++++++++++++++++
 .../reactive/streams/BasicPublisherTest.java    | 17 ++++++++
 .../support/ReactiveStreamsTestService.java     | 15 ++++++-
 12 files changed, 177 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 747b95b..6d04b2c 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -55,7 +55,7 @@ The Reactive Streams component supports 2 options which are listed below.
 
 
 // endpoint options: START
-The Reactive Streams component supports 8 endpoint options which are listed below:
+The Reactive Streams component supports 9 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -68,6 +68,7 @@ The Reactive Streams component supports 8 endpoint options which are listed belo
 | maxInflightExchanges | consumer | 128 | Integer | Maximum number of exchanges concurrently being processed by Camel. This parameter controls backpressure on the stream. Setting a non-positive value will disable backpressure.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.
+| backpressureStrategy | producer |  | ReactiveStreamsBackpressureStrategy | The backpressure strategy to use when pushing events to a slow subscriber.
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
 |=======================================================================
 {% endraw %}

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 081bbf4..2f1bdb7 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -22,6 +22,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,8 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private ExecutorService executor;
 
+    private CamelReactiveStreamsService service;
+
     public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -46,18 +49,20 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
+        this.service = CamelReactiveStreams.get(endpoint.getCamelContext(), endpoint.getServiceName());
+
         int poolSize = endpoint.getConcurrentConsumers();
         if (executor == null) {
             executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
         }
 
-        CamelReactiveStreams.get(endpoint.getCamelContext(), endpoint.getServiceName()).attachConsumer(endpoint.getStream(), this);
+        this.service.attachCamelConsumer(endpoint.getStream(), this);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        CamelReactiveStreams.get(endpoint.getCamelContext(), endpoint.getServiceName()).detachConsumer(endpoint.getStream());
+        this.service.detachCamelConsumer(endpoint.getStream());
 
         if (executor != null) {
             endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
index 07a74e3..ca1f993 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
@@ -43,6 +43,9 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
     @UriParam
     private String serviceName;
 
+    @UriParam(label = "producer")
+    private ReactiveStreamsBackpressureStrategy backpressureStrategy;
+
     public ReactiveStreamsEndpoint(String endpointUri, ReactiveStreamsComponent component) {
         super(endpointUri, component);
     }
@@ -107,4 +110,17 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
     public void setServiceName(String serviceName) {
         this.serviceName = serviceName;
     }
+
+    public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
+        return backpressureStrategy;
+    }
+
+    /**
+     * The backpressure strategy to use when pushing events to a slow subscriber.
+     */
+    public void setBackpressureStrategy(ReactiveStreamsBackpressureStrategy backpressureStrategy) {
+        this.backpressureStrategy = backpressureStrategy;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
index 824f18d..4c75789 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
@@ -55,6 +55,19 @@ public class ReactiveStreamsProducer extends DefaultAsyncProducer {
     protected void doStart() throws Exception {
         super.doStart();
         this.service = CamelReactiveStreams.get(endpoint.getCamelContext(), endpoint.getServiceName());
+        this.service.attachCamelProducer(endpoint.getStream(), this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        this.service.detachCamelProducer(endpoint.getStream());
+    }
+
+    @Override
+    public ReactiveStreamsEndpoint getEndpoint() {
+        return endpoint;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 57c635e..b865a48 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -20,6 +20,7 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Service;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
@@ -77,10 +78,27 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
     <T> Subscriber<T> getSubscriber(String name, Class<T> type);
 
     /*
-     * Methods for producers.
+     * Methods for Camel producers.
      */
 
     /**
+     * Used by Camel to associate the publisher of the stream with the given name to a specific Camel producer.
+     * This method is used to bind a Camel route to a reactive stream.
+     *
+     * @param name the stream name
+     * @param producer the producer of the route
+     * @throws IllegalStateException if another producer is already associated with the given stream name
+     */
+    void attachCamelProducer(String name, ReactiveStreamsProducer producer);
+
+    /**
+     * Used by Camel to detach the existing producer from the given stream.
+     *
+     * @param name the stream name
+     */
+    void detachCamelProducer(String name);
+
+    /**
      * Used by Camel to send the exchange to all active subscriptions on the given stream.
      * The callback is used to signal that the exchange has been delivered to the subscribers.
      *
@@ -91,7 +109,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
     void process(String name, Exchange exchange, DispatchCallback<Exchange> callback);
 
     /*
-     * Methods for consumers.
+     * Methods for Camel consumers.
      */
 
     /**
@@ -102,13 +120,13 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param consumer the consumer of the route
      * @throws IllegalStateException if another consumer is already associated with the given stream name
      */
-    void attachConsumer(String name, ReactiveStreamsConsumer consumer);
+    void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
 
     /**
      * Used by Camel to detach the existing consumer from the given stream.
      *
      * @param name the stream name
      */
-    void detachConsumer(String name);
+    void detachCamelConsumer(String name);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index 6a30625..5cafcd2 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -28,6 +28,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsEndpoint;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
@@ -49,6 +51,8 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
 
     private List<CamelSubscription> subscriptions = new CopyOnWriteArrayList<>();
 
+    private ReactiveStreamsProducer producer;
+
     public CamelPublisher(ExecutorService workerPool, CamelContext context, String name) {
         this.workerPool = workerPool;
         this.backpressureStrategy = ((ReactiveStreamsComponent) context.getComponent("reactive-streams")).getBackpressureStrategy();
@@ -98,6 +102,28 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
         }
     }
 
+
+    public void attachProducer(ReactiveStreamsProducer producer) {
+        Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
+        if (this.producer != null) {
+            throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
+        }
+        this.producer = producer;
+
+        // Apply endpoint options if available
+        ReactiveStreamsEndpoint endpoint = producer.getEndpoint();
+        if (endpoint.getBackpressureStrategy() != null) {
+            this.backpressureStrategy = endpoint.getBackpressureStrategy();
+            for (CamelSubscription sub : this.subscriptions) {
+                sub.setBackpressureStrategy(endpoint.getBackpressureStrategy());
+            }
+        }
+    }
+
+    public void detachProducer() {
+        this.producer = null;
+    }
+
     @Override
     public void close() throws Exception {
         for (CamelSubscription sub : subscriptions) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index ef617be..7b285a6 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
@@ -112,16 +113,26 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public void attachConsumer(String name, ReactiveStreamsConsumer consumer) {
+    public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
         getSubscriber(name).attachConsumer(consumer);
     }
 
     @Override
-    public void detachConsumer(String name) {
+    public void detachCamelConsumer(String name) {
         getSubscriber(name).detachConsumer();
     }
 
     @Override
+    public void attachCamelProducer(String name, ReactiveStreamsProducer producer) {
+        getPayloadPublisher(name).attachProducer(producer);
+    }
+
+    @Override
+    public void detachCamelProducer(String name) {
+        getPayloadPublisher(name).detachProducer();
+    }
+
+    @Override
     public void setCamelContext(CamelContext camelContext) {
         this.context = camelContext;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 3b7e5d5..89f5afc 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -57,7 +57,7 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
     public void attachConsumer(ReactiveStreamsConsumer consumer) {
         synchronized (this) {
             if (this.consumer != null) {
-                throw new IllegalStateException("A consumer is already attached on stream '" + name + "'");
+                throw new IllegalStateException("A consumer is already attached to the stream '" + name + "'");
             }
             this.consumer = consumer;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index e3489ef..d5f4337 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -248,5 +248,9 @@ public class CamelSubscription implements Subscription {
         checkAndFlush();
     }
 
-
+    public void setBackpressureStrategy(ReactiveStreamsBackpressureStrategy backpressureStrategy) {
+        mutex.lock();
+        this.backpressureStrategy = backpressureStrategy;
+        mutex.unlock();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
index 8846cd1..32e9cab 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
@@ -157,6 +157,48 @@ public class BackpressureStrategyTest extends CamelTestSupport {
         subscriber.cancel();
     }
 
+    @Test
+    public void testBackpressureDropStrategyInEndpoint() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:gen?period=20&repeatCount=20")
+                        .setBody().header(Exchange.TIMER_COUNTER)
+                        .to("reactive-streams:integers?backpressureStrategy=DROP");
+            }
+        }.addRoutesToCamelContext(context);
+
+
+        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch latch2 = new CountDownLatch(2);
+
+        TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>() {
+            @Override
+            public void onNext(Integer o) {
+                queue.add(o);
+                latch.countDown();
+                latch2.countDown();
+            }
+        };
+        subscriber.setInitiallyRequested(1);
+        CamelReactiveStreams.get(context).getPublisher("integers", Integer.class).subscribe(subscriber);
+
+        context().start();
+
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+        Thread.sleep(1000); // wait for all numbers to be generated
+
+        subscriber.request(19);
+        assertTrue(latch2.await(1, TimeUnit.SECONDS));
+        Thread.sleep(200); // add other time to ensure no other items arrive
+        assertEquals(2, queue.size());
+        int sum = queue.stream().reduce((i, j) -> i + j).get();
+        assertEquals(3, sum); // 1 + 2 = 3
+
+        subscriber.cancel();
+    }
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
index 84b15de..11910dc 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
@@ -104,6 +104,23 @@ public class BasicPublisherTest extends CamelTestSupport {
         disp3.dispose();
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testOnlyOneCamelProducerPerPublisher() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:one")
+                        .to("reactive-streams:stream");
+
+                from("direct:two")
+                        .to("reactive-streams:stream");
+            }
+        }.addRoutesToCamelContext(context);
+
+        context.start();
+    }
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;

http://git-wip-us.apache.org/repos/asf/camel/blob/b64250b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index b46c9ff..0301757 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.reactive.streams.support;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.reactivestreams.Publisher;
@@ -84,12 +85,22 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
     }
 
     @Override
-    public void attachConsumer(String name, ReactiveStreamsConsumer consumer) {
+    public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
 
     }
 
     @Override
-    public void detachConsumer(String name) {
+    public void detachCamelConsumer(String name) {
+
+    }
+
+    @Override
+    public void attachCamelProducer(String name, ReactiveStreamsProducer producer) {
+
+    }
+
+    @Override
+    public void detachCamelProducer(String name) {
 
     }