You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/02 16:06:41 UTC

[1/2] camel git commit: CAMEL-10612: added more examples and improved interface

Repository: camel
Updated Branches:
  refs/heads/master 1885db212 -> b4f75905b


CAMEL-10612: added more examples and improved interface


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b4f75905
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b4f75905
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b4f75905

Branch: refs/heads/master
Commit: b4f75905b9f232a96846f7d2e22fe8b4fbe9fdb9
Parents: 5108cb5
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Thu Feb 2 17:06:02 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Thu Feb 2 17:06:20 2017 +0100

----------------------------------------------------------------------
 .../streams/ReactiveStreamsConverter.java       |   8 -
 .../api/CamelReactiveStreamsService.java        |  30 +++-
 .../engine/CamelReactiveStreamsServiceImpl.java |  11 ++
 .../streams/util/UnwrapStreamProcessor.java     |  90 +++++++++++
 .../streams/util/UnwrappingStreamProcessor.java |  78 ---------
 .../reactive/streams/BeanCallTest.java          |  11 +-
 .../support/ReactiveStreamsTestService.java     |  12 ++
 examples/camel-example-reactive-streams/pom.xml |   4 +
 .../streams/BasicCamelToReactorExample.java     |  98 ++++++++++++
 .../BasicCamelToReactorInOutExample.java        | 158 +++++++++++++++++++
 .../streams/BasicReactorToCamelExample.java     |  88 +++++++++++
 .../BasicReactorToCamelInOutExample.java        |  90 +++++++++++
 .../streams/ReactiveStreamsSpringBootApp.java   |  31 ----
 .../example/reactive/streams/RestExample.java   |  82 ++++++++++
 .../reactive/streams/SampleCamelRoutes.java     |  40 -----
 .../reactive/streams/SampleReactiveStreams.java |  52 ------
 .../app/ReactiveStreamsSpringBootApp.java       |  31 ++++
 .../main/resources/META-INF/spring.factories    |  24 +++
 .../src/main/resources/application.yml          |  12 ++
 19 files changed, 735 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
index 23f49c9..1fd61ae 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
@@ -16,21 +16,13 @@
  */
 package org.apache.camel.component.reactive.streams;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
 import org.apache.camel.FallbackConverter;
-import org.apache.camel.Processor;
-import org.apache.camel.component.bean.BeanInfo;
-import org.apache.camel.component.bean.BeanProcessor;
-import org.apache.camel.component.bean.ConstantBeanHolder;
 import org.apache.camel.component.reactive.streams.util.MonoPublisher;
-import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor;
 import org.apache.camel.spi.TypeConverterRegistry;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.reactivestreams.Publisher;
 
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 3a7acd4..03ca6a0 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.reactive.streams.api;
 
+import java.util.function.Function;
+
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Service;
@@ -83,11 +85,22 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      *
      * @param name the stream name
      * @param data the data to push
-     * @return an publisher with the resulting exchange
+     * @return a publisher with the resulting exchange
      */
     Publisher<Exchange> request(String name, Object data);
 
     /**
+     * Returns a function that pushes data into the specified Camel stream and
+     * returns a Publisher (mono) holding the resulting exchange or an error.
+     *
+     * This is a curryied version of {@link CamelReactiveStreamsService#request(String, Object)}.
+     *
+     * @param name the stream name
+     * @return a function that returns a publisher with the resulting exchange
+     */
+    Function<?, ? extends Publisher<Exchange>> request(String name);
+
+    /**
      * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
      * the exchange output or an error.
      *
@@ -95,10 +108,23 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param data the data to push
      * @param type  the type to which the output should be converted
      * @param <T> the generic type of the resulting Publisher
-     * @return an publisher with the resulting data
+     * @return a publisher with the resulting data
      */
     <T> Publisher<T> request(String name, Object data, Class<T> type);
 
+    /**
+     * Returns a function that pushes data into the specified Camel stream and
+     * returns a Publisher (mono) holding the exchange output or an error.
+     *
+     * This is a curryied version of {@link CamelReactiveStreamsService#request(String, Object, Class)}.
+     *
+     * @param name the stream name
+     * @param type  the type to which the output should be converted
+     * @param <T> the generic type of the resulting Publisher
+     * @return a function that returns a publisher with the resulting data
+     */
+    <T> Function<Object, Publisher<T>> request(String name, Class<T> type);
+
     /*
      * Methods for Camel producers.
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index edffb9e..4b67ce0 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.reactive.streams.engine;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -120,6 +121,11 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
+    public Function<?, ? extends Publisher<Exchange>> request(String name) {
+        return data -> request(name, data);
+    }
+
+    @Override
     public <T> Publisher<T> request(String name, Object data, Class<T> type) {
         return new ConvertingPublisher<>(request(name, data), type);
     }
@@ -154,6 +160,11 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
         return publisher;
     }
 
+    @Override
+    public <T> Function<Object, Publisher<T>> request(String name, Class<T> type) {
+        return data -> request(name, data, type);
+    }
+
     private CamelPublisher getPayloadPublisher(String name) {
         synchronized (this) {
             if (!publishers.containsKey(name)) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
new file mode 100644
index 0000000..6800ce0
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.util;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * A Processor that converts a Publisher into its content asynchronously.
+ */
+public class UnwrapStreamProcessor implements AsyncProcessor {
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        Object content = exchange.getIn().getBody();
+        if (content instanceof Publisher) {
+            Publisher<?> pub = Publisher.class.cast(content);
+
+            List<Object> data = new LinkedList<>();
+
+            pub.subscribe(new Subscriber<Object>() {
+
+                @Override
+                public void onSubscribe(Subscription subscription) {
+                    subscription.request(Long.MAX_VALUE);
+                }
+
+                @Override
+                public void onNext(Object o) {
+                    data.add(o);
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    addData();
+                    exchange.setException(throwable);
+                    callback.done(false);
+                }
+
+                @Override
+                public void onComplete() {
+                    addData();
+                    callback.done(false);
+                }
+
+                private void addData() {
+                    Object body;
+                    if (data.size() == 0) {
+                        body = null;
+                    } else if (data.size() == 1) {
+                        body = data.get(0);
+                    } else {
+                        body = data;
+                    }
+                    exchange.getIn().setBody(body);
+                }
+
+            });
+        }
+        return false;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
deleted file mode 100644
index fa85fd2..0000000
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.reactive.streams.util;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
-import org.apache.camel.util.AsyncProcessorHelper;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-/**
- * A Processor that converts a Publisher into its content asynchronously.
- */
-public class UnwrappingStreamProcessor implements AsyncProcessor {
-
-    @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        Object content = exchange.getIn().getBody();
-        if (content instanceof Publisher) {
-            Publisher<?> pub = Publisher.class.cast(content);
-
-            List<Object> data = new LinkedList<>();
-
-            pub.subscribe(new Subscriber<Object>() {
-
-                @Override
-                public void onSubscribe(Subscription subscription) {
-                    subscription.request(Long.MAX_VALUE);
-                }
-
-                @Override
-                public void onNext(Object o) {
-                    data.add(o);
-                }
-
-                @Override
-                public void onError(Throwable throwable) {
-                    exchange.getIn().setBody(data);
-                    exchange.setException(throwable);
-                    callback.done(false);
-                }
-
-                @Override
-                public void onComplete() {
-                    exchange.getIn().setBody(data);
-                    callback.done(false);
-                }
-
-            });
-        }
-        return false;
-    }
-
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
index 1f05a00..5a532ae 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -22,7 +22,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Header;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 import org.reactivestreams.Publisher;
@@ -42,7 +42,8 @@ public class BeanCallTest extends CamelTestSupport {
 
                 from("direct:num")
                         .bean(BeanCallTest.this, "processBody")
-                        .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+                        .process(new UnwrapStreamProcessor()) // Can be removed?
+                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")
@@ -74,7 +75,8 @@ public class BeanCallTest extends CamelTestSupport {
 
                 from("direct:num")
                         .bean(BeanCallTest.this, "processBodyWrongType")
-                        .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+                        .process(new UnwrapStreamProcessor()) // Can be removed?
+                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")
@@ -105,7 +107,8 @@ public class BeanCallTest extends CamelTestSupport {
 
                 from("direct:num")
                         .bean(BeanCallTest.this, "processHeader")
-                        .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+                        .process(new UnwrapStreamProcessor()) // Can be removed?
+                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 5288263..6ab9c5e 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.reactive.streams.support;
 
+import java.util.function.Function;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -114,6 +116,16 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
         return null;
     }
 
+    @Override
+    public Function<?, ? extends Publisher<Exchange>> request(String name) {
+        return null;
+    }
+
+    @Override
+    public <T> Function<Object, Publisher<T>> request(String name, Class<T> type) {
+        return null;
+    }
+
     public String getName() {
         return name;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/pom.xml b/examples/camel-example-reactive-streams/pom.xml
index abbbe20..10c1a23 100644
--- a/examples/camel-example-reactive-streams/pom.xml
+++ b/examples/camel-example-reactive-streams/pom.xml
@@ -47,6 +47,10 @@
             <artifactId>camel-reactive-streams-starter</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-servlet-starter</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
new file mode 100644
index 0000000..1929255
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how a reactive stream framework can subscribe to events published
+ * by Camel routes.
+ *
+ * The exchange pattern is in-only from Camel to Reactor.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.basic.camel-to-reactor")
+public class BasicCamelToReactorExample {
+
+    /**
+     * The reactor streams.
+     */
+    @Component
+    public static class BasicCamelToReactorExampleStreams {
+        private static final Logger LOG = LoggerFactory.getLogger(BasicCamelToReactorExample.class);
+
+        @Autowired
+        private CamelReactiveStreamsService camel;
+
+
+        @PostConstruct
+        public void setupStreams() {
+
+            // Use two streams from Camel
+            Publisher<Integer> numbers = camel.getPublisher("numbers", Integer.class);
+            Publisher<String> strings = camel.getPublisher("strings", String.class);
+
+            Flux.from(numbers)
+                    .zipWith(strings) // emit items in pairs
+                    .map(tuple -> "BasicCamelToReactor - " + tuple.getT1() + " -> " + tuple.getT2())
+                    .doOnNext(LOG::info)
+                    .subscribe();
+        }
+
+    }
+
+
+    /**
+     * The Camel Configuration.
+     */
+    @Component
+    public static class BasicCamelToReactorExampleRoutes extends RouteBuilder {
+
+        @Override
+        public void configure() throws Exception {
+
+            // Generating numbers every 5 seconds and forwarding to the stream "numbers"
+            from("timer:clock?period=5000")
+                    .setBody().header(Exchange.TIMER_COUNTER)
+                    .to("reactive-streams:numbers");
+
+            // Generating strings every 4.9 seconds and forwarding to the stream "strings"
+            from("timer:clock2?period=4900&delay=2000")
+                    .setBody().simple("Hello World ${header.CamelTimerCounter}!")
+                    .to("reactive-streams:strings");
+
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
new file mode 100644
index 0000000..1d0024c
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
+import org.reactivestreams.Publisher;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how Camel can asynchronously request data to a reactive stream framework
+ * and continue processing.
+ *
+ * The exchange pattern is in-out from Camel to Reactor.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.basic.camel-to-reactor-in-out")
+public class BasicCamelToReactorInOutExample {
+
+    /**
+     * The reactor streams.
+     */
+    @Component("userBean")
+    public static class BasicCamelToReactorInOutExampleStreams {
+
+        /**
+         * This method will be called by a Camel route.
+         */
+        public Publisher<UserInfo> getUserInfo(Publisher<Long> userId) {
+            return Flux.from(userId)
+                    .map(UserInfo::new)
+                    .flatMap(this::retrieveAddress)
+                    .flatMap(this::retrieveName);
+        }
+
+        /**
+         * This is a sample utility method.
+         */
+        private Publisher<UserInfo> retrieveAddress(UserInfo user) {
+            // you can do an async database retrieval here
+            return Flux.just(user.withAddress("Address" + user.getId()));
+        }
+
+        private Publisher<UserInfo> retrieveName(UserInfo user) {
+            // you can do an async database retrieval here
+            return Flux.just(user.withName("Name" + user.getId()));
+        }
+
+    }
+
+
+    /**
+     * The Camel Configuration.
+     */
+    @Component
+    public static class BasicCamelToReactorInOutExampleRoutes extends RouteBuilder {
+
+        @Override
+        public void configure() throws Exception {
+
+            // Generate a Id and retrieve user data from reactor
+            from("timer:clock?period=9000&delay=1500")
+                    .setBody().header(Exchange.TIMER_COUNTER).convertBodyTo(Long.class) // Sample ID
+                    .bean("userBean", "getUserInfo") // Get the user info from reactor code
+                    .process(new UnwrapStreamProcessor()).split().body() // Unwrap the Publisher
+                    .log("BasicCamelToReactorInOut - Got ${body}");
+
+        }
+
+    }
+
+
+    /**
+     * Sample bean used in the example.
+     */
+    public static class UserInfo {
+
+        private Long id;
+
+        private String name;
+
+        private String address;
+
+        public UserInfo() {
+        }
+
+        public UserInfo(Long id) {
+            this.id = id;
+        }
+
+        public Long getId() {
+            return id;
+        }
+
+        public void setId(Long id) {
+            this.id = id;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public UserInfo withName(String name) {
+            this.name = name;
+            return this;
+        }
+
+        public String getAddress() {
+            return address;
+        }
+
+        public void setAddress(String address) {
+            this.address = address;
+        }
+
+        public UserInfo withAddress(String address) {
+            this.address = address;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder("UserInfo{");
+            sb.append("id=").append(id);
+            sb.append(", name='").append(name).append('\'');
+            sb.append(", address='").append(address).append('\'');
+            sb.append('}');
+            return sb.toString();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
new file mode 100644
index 0000000..5a71eb5
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import java.time.Duration;
+import javax.annotation.PostConstruct;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.reactivestreams.Subscriber;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how a reactive stream framework can publish events that are consumed
+ * by Camel routes.
+ *
+ * The exchange pattern is in-only from Reactor to Camel.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.basic.reactor-to-camel")
+public class BasicReactorToCamelExample {
+
+    /**
+     * The reactor streams.
+     */
+    @Component
+    public static class BasicReactorToCamelExampleStreams {
+
+        @Autowired
+        private CamelReactiveStreamsService camel;
+
+
+        @PostConstruct
+        public void setupStreams() {
+
+            // Get a subscriber from camel
+            Subscriber<String> elements = camel.getSubscriber("elements", String.class);
+
+            // Emit a string every 7 seconds and push it to the Camel "elements" stream
+            Flux.interval(Duration.ofSeconds(7))
+                    .map(item -> "element " + item)
+                    .subscribe(elements);
+
+        }
+    }
+
+
+    /**
+     * The Camel Configuration.
+     */
+    @Component
+    public static class BasicReactorToCamelExampleRoutes extends RouteBuilder {
+
+        @Override
+        public void configure() throws Exception {
+
+            // Transform the body of received items and log
+            from("reactive-streams:elements")
+                    .setBody().simple("BasicReactorToCamel - Camel received ${body}")
+                    .to("log:INFO");
+
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
new file mode 100644
index 0000000..36bc59e
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import java.time.Duration;
+import javax.annotation.PostConstruct;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how a reactive stream framework can asynchronously request data to a Camel stream
+ * and continue processing.
+ *
+ * The exchange pattern is in-out from Reactor to Camel.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.basic.reactor-to-camel-in-out")
+public class BasicReactorToCamelInOutExample {
+
+    /**
+     * The reactor streams.
+     */
+    @Component
+    public static class BasicReactorToCamelInOutExampleStreams {
+        private static final Logger LOG = LoggerFactory.getLogger(BasicReactorToCamelInOutExample.class);
+
+        @Autowired
+        private CamelReactiveStreamsService camel;
+
+
+        @PostConstruct
+        public void setupStreams() {
+
+            Flux.interval(Duration.ofSeconds(8))
+                    .map(i -> i + 1) // to start from 1
+                    .flatMap(camel.request("sqrt", Double.class)) // call Camel and continue
+                    .map(d -> "BasicReactorToCamelInOut - sqrt=" + d)
+                    .doOnNext(LOG::info)
+                    .subscribe();
+
+        }
+    }
+
+
+    /**
+     * The Camel Configuration.
+     */
+    @Component
+    public static class BasicReactorToCamelInOutExampleRoutes extends RouteBuilder {
+
+        @Override
+        public void configure() throws Exception {
+
+            // Transform the body of every exchange into its square root
+            from("reactive-streams:sqrt")
+                    .setBody().body(Double.class, Math::sqrt);
+
+            // This route can be much more complex: it can use any Camel component to compute the output data
+
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ReactiveStreamsSpringBootApp.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ReactiveStreamsSpringBootApp.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ReactiveStreamsSpringBootApp.java
deleted file mode 100644
index 6440d6b..0000000
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ReactiveStreamsSpringBootApp.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.example.reactive.streams;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-//CHECKSTYLE:OFF
-@SpringBootApplication
-public class ReactiveStreamsSpringBootApp {
-
-    public static void main(String[] args) {
-        SpringApplication.run(ReactiveStreamsSpringBootApp.class, args);
-    }
-
-}
-//CHECKSTYLE:ON

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java
new file mode 100644
index 0000000..4a650d2
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import org.apache.camel.Header;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
+import org.reactivestreams.Publisher;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how a Camel route defined with rest DSL can call a can asynchronously request data to a reactive stream framework
+ * and continue processing.
+ *
+ * The exchange pattern is in-out from Camel to Reactor.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.others.rest-example")
+public class RestExample {
+
+    /**
+     * The reactor streams.
+     */
+    @Component("calculator")
+    public static class RestExampleStreams {
+
+        /**
+         * This method will be called by a Camel route.
+         */
+        public Publisher<Long> sum(@Header("num1") Publisher<Long> num1, @Header("num2") Publisher<Long> num2) {
+            return Flux.from(num1).zipWith(num2)
+                    .map(t -> t.getT1() + t.getT2());
+        }
+
+    }
+
+
+    /**
+     * The Camel Configuration.
+     */
+    @Component
+    public static class RestExampleRoutes extends RouteBuilder {
+
+        @Override
+        public void configure() throws Exception {
+
+            // The full path should be eg.: http://localhost:8080/camel/sum/23/31
+            rest().get("/sum/{num1}/{num2}")
+                    .produces("text/plain")
+                    .route()
+                    .setHeader("num1").simple("headerAs(num1,Long)")
+                    .setHeader("num2").simple("headerAs(num2,Long)")
+                    .bean("calculator", "sum")
+                    .process(new UnwrapStreamProcessor())
+                    .setBody().simple("The result is: ${body}");
+
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleCamelRoutes.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleCamelRoutes.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleCamelRoutes.java
deleted file mode 100644
index cc5b59b..0000000
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleCamelRoutes.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.example.reactive.streams;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.springframework.stereotype.Component;
-
-@Component
-public class SampleCamelRoutes extends RouteBuilder {
-
-    @Override
-    public void configure() throws Exception {
-
-        // Generating numbers every 3 seconds and forwarding to the stream "numbers"
-        from("timer:clock?period=3000")
-                .setBody().header(Exchange.TIMER_COUNTER)
-                .to("reactive-streams:numbers");
-
-        // Generating strings every 2.9 seconds and forwarding to the stream "strings"
-        from("timer:clock2?period=2900&delay=1000")
-                .setBody().simple("Hello World ${header.CamelTimerCounter}!")
-                .to("reactive-streams:strings");
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleReactiveStreams.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleReactiveStreams.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleReactiveStreams.java
deleted file mode 100644
index 271e209..0000000
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleReactiveStreams.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.example.reactive.streams;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.reactivestreams.Publisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-@Component
-public class SampleReactiveStreams {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SampleReactiveStreams.class);
-
-    @Autowired
-    private CamelReactiveStreamsService camelStreams;
-
-    @PostConstruct
-    public void configure() {
-
-        Publisher<Integer> numbers = camelStreams.getPublisher("numbers", Integer.class);
-        Publisher<String> strings = camelStreams.getPublisher("strings", String.class);
-
-        Flux.from(numbers)
-                .zipWith(strings)
-                .map(tuple -> "Seq: " + tuple.getT1() + " - " + tuple.getT2())
-                .doOnNext(LOG::info)
-                .subscribe();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/app/ReactiveStreamsSpringBootApp.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/app/ReactiveStreamsSpringBootApp.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/app/ReactiveStreamsSpringBootApp.java
new file mode 100644
index 0000000..10a62a1
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/app/ReactiveStreamsSpringBootApp.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams.app;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+//CHECKSTYLE:OFF
+@SpringBootApplication
+public class ReactiveStreamsSpringBootApp {
+
+    public static void main(String[] args) {
+        SpringApplication.run(ReactiveStreamsSpringBootApp.class, args);
+    }
+
+}
+//CHECKSTYLE:ON

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories b/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..8be4929
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.apache.camel.example.reactive.streams.BasicCamelToReactorExample,\
+org.apache.camel.example.reactive.streams.BasicReactorToCamelExample,\
+org.apache.camel.example.reactive.streams.BasicReactorToCamelInOutExample,\
+org.apache.camel.example.reactive.streams.BasicCamelToReactorInOutExample,\
+org.apache.camel.example.reactive.streams.RestExample
+

http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/resources/application.yml
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/resources/application.yml b/examples/camel-example-reactive-streams/src/main/resources/application.yml
index d5e4f9b..eeeb3ac 100644
--- a/examples/camel-example-reactive-streams/src/main/resources/application.yml
+++ b/examples/camel-example-reactive-streams/src/main/resources/application.yml
@@ -15,3 +15,15 @@
 ## limitations under the License.
 ## ------------------------------------------------------------------------
 
+# All examples are enabled by default.
+# You can turn them on-off here
+examples:
+  basic:
+    camel-to-reactor: true
+    reactor-to-camel: true
+    camel-to-reactor-in-out: true
+    reactor-to-camel-in-out: true
+  others:
+    rest-example: true
+
+


[2/2] camel git commit: CAMEL-10612: allow a stream to request data to Camel

Posted by nf...@apache.org.
CAMEL-10612: allow a stream to request data to Camel


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5108cb51
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5108cb51
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5108cb51

Branch: refs/heads/master
Commit: 5108cb512f326ff690337f9e1fcfdaffa089601b
Parents: 1885db2
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Thu Feb 2 10:58:14 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Thu Feb 2 17:06:20 2017 +0100

----------------------------------------------------------------------
 .../api/CamelReactiveStreamsService.java        |  22 ++
 .../engine/CamelReactiveStreamsServiceImpl.java |  55 +++-
 .../streams/engine/CamelSubscriber.java         |   5 +
 .../streams/engine/DelayedMonoPublisher.java    | 188 ++++++++++++++
 .../streams/DelayedMonoPublisherTest.java       | 257 +++++++++++++++++++
 .../reactive/streams/ExchangeRequestTest.java   |  89 +++++++
 .../support/ReactiveStreamsTestService.java     |  10 +
 7 files changed, 625 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index b865a48..3a7acd4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -77,6 +77,28 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      */
     <T> Subscriber<T> getSubscriber(String name, Class<T> type);
 
+    /**
+     * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
+     * the resulting exchange or an error.
+     *
+     * @param name the stream name
+     * @param data the data to push
+     * @return an publisher with the resulting exchange
+     */
+    Publisher<Exchange> request(String name, Object data);
+
+    /**
+     * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
+     * the exchange output or an error.
+     *
+     * @param name the stream name
+     * @param data the data to push
+     * @param type  the type to which the output should be converted
+     * @param <T> the generic type of the resulting Publisher
+     * @return an publisher with the resulting data
+     */
+    <T> Publisher<T> request(String name, Object data, Class<T> type);
+
     /*
      * Methods for Camel producers.
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 7b285a6..edffb9e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
@@ -29,10 +30,14 @@ import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServi
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
 import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.spi.Synchronization;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
-
+/**
+ * The default implementation of the reactive streams service.
+ */
 public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsService {
 
     private CamelContext context;
@@ -100,6 +105,54 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
         getPayloadPublisher(name).publish(payload);
     }
 
+    @Override
+    public Publisher<Exchange> request(String name, Object data) {
+        Exchange exchange;
+        if (data instanceof Exchange) {
+            exchange = (Exchange) data;
+        } else {
+            exchange = new DefaultExchange(context);
+            exchange.setPattern(ExchangePattern.InOut);
+            exchange.getIn().setBody(data);
+        }
+
+        return doRequest(name, exchange);
+    }
+
+    @Override
+    public <T> Publisher<T> request(String name, Object data, Class<T> type) {
+        return new ConvertingPublisher<>(request(name, data), type);
+    }
+
+    protected Publisher<Exchange> doRequest(String name, Exchange data) {
+        ReactiveStreamsConsumer consumer = getSubscriber(name).getConsumer();
+        if (consumer == null) {
+            throw new IllegalStateException("No consumers attached to the stream " + name);
+        }
+
+        DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool);
+
+        data.addOnCompletion(new Synchronization() {
+            @Override
+            public void onComplete(Exchange exchange) {
+                publisher.setData(exchange);
+            }
+
+            @Override
+            public void onFailure(Exchange exchange) {
+                Throwable throwable = exchange.getException();
+                if (throwable == null) {
+                    throwable = new IllegalStateException("Unknown Exception");
+                }
+                publisher.setException(throwable);
+            }
+        });
+
+        consumer.process(data, doneSync -> {
+        });
+
+        return publisher;
+    }
 
     private CamelPublisher getPayloadPublisher(String name) {
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index eda2863..098d0ca 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -64,6 +64,10 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
         refill();
     }
 
+    public synchronized ReactiveStreamsConsumer getConsumer() {
+        return consumer;
+    }
+
     public void detachConsumer() {
         synchronized (this) {
             this.consumer = null;
@@ -86,6 +90,7 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
         }
 
         if (!allowed) {
+            LOG.warn("There is another active subscription: cancelled");
             subscription.cancel();
         } else {
             refill();

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
new file mode 100644
index 0000000..ef8ece1
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.engine;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Publish a single item as soon as it's available.
+ */
+public class DelayedMonoPublisher<T> implements Publisher<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedMonoPublisher.class);
+
+    private ExecutorService workerPool;
+
+    private volatile T data;
+
+    private volatile Throwable exception;
+
+    private List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>();
+
+    private AtomicBoolean flushing = new AtomicBoolean(false);
+
+    public DelayedMonoPublisher(ExecutorService workerPool) {
+        this.workerPool = workerPool;
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> subscriber) {
+        Objects.requireNonNull(subscriber, "subscriber must not be null");
+        MonoSubscription sub = new MonoSubscription(subscriber);
+        subscriptions.add(sub);
+        subscriber.onSubscribe(sub);
+        flushCycle();
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        Objects.requireNonNull(data, "data must be not null");
+        if (this.data != null) {
+            throw new IllegalStateException("data has already been set");
+        } else if (this.exception != null) {
+            throw new IllegalStateException("an exception has already been set");
+        }
+
+        this.data = data;
+        flushCycle();
+    }
+
+    public Throwable getException() {
+        return exception;
+    }
+
+    public void setException(Throwable exception) {
+        Objects.requireNonNull(exception, "exception must be not null");
+        if (this.data != null) {
+            throw new IllegalStateException("data has already been set");
+        } else if (this.exception != null) {
+            throw new IllegalStateException("an exception has already been set");
+        }
+
+        this.exception = exception;
+        flushCycle();
+    }
+
+    private void flushCycle() {
+        boolean notRunning = flushing.compareAndSet(false, true);
+
+        if (notRunning) {
+            workerPool.execute(() -> {
+                try {
+                    List<MonoSubscription> completed = new LinkedList<>();
+                    for (MonoSubscription sub : this.subscriptions) {
+                        sub.flush();
+                        if (sub.isTerminated()) {
+                            completed.add(sub);
+                        }
+                    }
+                    this.subscriptions.removeAll(completed);
+                } finally {
+                    flushing.set(false);
+                }
+
+                boolean runAgain = false;
+                for (MonoSubscription sub : this.subscriptions) {
+                    if (sub.isReady()) {
+                        runAgain = true;
+                        break;
+                    }
+                }
+                if (runAgain) {
+                    flushCycle();
+                }
+
+            });
+        }
+    }
+
+    private final class MonoSubscription implements Subscription {
+
+        private volatile boolean terminated;
+
+        private volatile boolean requested;
+
+        private Subscriber<? super T> subscriber;
+
+        private MonoSubscription(Subscriber<? super T> subscriber) {
+            this.subscriber = subscriber;
+        }
+
+        @Override
+        public void request(long l) {
+            if (terminated) {
+                throw new IllegalStateException("The subscription is terminated");
+            }
+
+            if (l <= 0) {
+                subscriber.onError(new IllegalArgumentException("3.9"));
+                synchronized (this) {
+                    terminated = true;
+                }
+            } else {
+                synchronized (this) {
+                    requested = true;
+                }
+            }
+
+        }
+
+        public void flush() {
+            synchronized (this) {
+                if (!isReady()) {
+                    return;
+                }
+
+                terminated = true;
+            }
+
+            if (data != null) {
+                subscriber.onNext(data);
+                subscriber.onComplete();
+            } else {
+                subscriber.onError(exception);
+            }
+        }
+
+        public boolean isTerminated() {
+            return terminated;
+        }
+
+        public boolean isReady() {
+            return !terminated && requested && (data != null || exception != null);
+        }
+
+        @Override
+        public synchronized void cancel() {
+            terminated = true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
new file mode 100644
index 0000000..011225e
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.component.reactive.streams.engine.DelayedMonoPublisher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class DelayedMonoPublisherTest {
+
+    private ExecutorService service;
+
+    @Before
+    public void init() {
+        service = new ScheduledThreadPoolExecutor(3);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        service.shutdown();
+        service.awaitTermination(1, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testAlreadyAvailable() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setData(5);
+
+        LinkedList<Integer> data = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(1, data.size());
+        assertEquals(5, data.get(0).intValue());
+    }
+
+    @Test
+    public void testExceptionAlreadyAvailable() throws Exception {
+
+        Exception ex = new RuntimeException("An exception");
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setException(ex);
+
+        LinkedList<Throwable> exceptions = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Flowable.fromPublisher(pub)
+                .doOnError(exceptions::add)
+                .doOnError(e -> latch.countDown())
+                .subscribe();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(1, exceptions.size());
+        assertEquals(ex, exceptions.get(0));
+    }
+
+    @Test
+    public void testAvailableSoon() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        LinkedList<Integer> data = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Thread.yield();
+        pub.setData(5);
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(1, data.size());
+        assertEquals(5, data.get(0).intValue());
+    }
+
+    @Test
+    public void testAvailableLater() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        LinkedList<Integer> data = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Thread.sleep(200);
+        pub.setData(5);
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(1, data.size());
+        assertEquals(5, data.get(0).intValue());
+    }
+
+    @Test
+    public void testMultipleSubscribers() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        ConcurrentLinkedDeque<Integer> data = new ConcurrentLinkedDeque<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Thread.sleep(200);
+        pub.setData(5);
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(2, data.size());
+        for (Integer n : data) {
+            assertEquals(5, n.intValue());
+        }
+    }
+
+    @Test
+    public void testMultipleSubscribersMixedArrival() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        ConcurrentLinkedDeque<Integer> data = new ConcurrentLinkedDeque<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Thread.sleep(200);
+        pub.setData(5);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(2, data.size());
+        for (Integer n : data) {
+            assertEquals(5, n.intValue());
+        }
+    }
+
+    @Test
+    public void testMultipleSubscribersMixedArrivalException() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        Exception ex = new RuntimeException("An exception");
+
+        ConcurrentLinkedDeque<Throwable> exceptions = new ConcurrentLinkedDeque<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Flowable.fromPublisher(pub)
+                .doOnError(exceptions::add)
+                .doOnError(e -> latch.countDown())
+                .subscribe();
+
+        Thread.sleep(200);
+        pub.setException(ex);
+
+        Flowable.fromPublisher(pub)
+                .doOnError(exceptions::add)
+                .doOnError(e -> latch.countDown())
+                .subscribe();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(2, exceptions.size());
+        for (Throwable t : exceptions) {
+            assertEquals(ex, t);
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDataOrExceptionAllowed() throws Exception {
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        Exception ex = new RuntimeException("An exception");
+        pub.setException(ex);
+        pub.setData(1);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDataOrExceptionAllowed2() throws Exception {
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setData(1);
+        Exception ex = new RuntimeException("An exception");
+        pub.setException(ex);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testOnlyOneDataAllowed() throws Exception {
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setData(1);
+        pub.setData(2);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testOnlyOneExceptionAllowed() throws Exception {
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setException(new RuntimeException("An exception"));
+        pub.setException(new RuntimeException("An exception"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
new file mode 100644
index 0000000..4e96ac6
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+
+public class ExchangeRequestTest extends CamelTestSupport {
+
+    @Test
+    public void testStreamRequest() throws Exception {
+
+        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+        Publisher<Exchange> string = camel.request("data", new DefaultExchange(context));
+
+        Exchange res = Flowable.fromPublisher(string).blockingFirst();
+
+        assertNotNull(res);
+
+        String content = res.getIn().getBody(String.class);
+        assertNotNull(content);
+        assertEquals("123", content);
+    }
+
+    @Test
+    public void testInteraction() throws Exception {
+
+        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+        Integer res = Flowable.fromPublisher(camel.request("plusOne", 1L, Integer.class))
+                .blockingFirst();
+
+        assertNotNull(res);
+        assertEquals(2, res.intValue());
+    }
+
+    @Test
+    public void testMultipleInteractions() throws Exception {
+        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+        Integer sum = Flowable.just(1, 2, 3)
+                .flatMap(e -> camel.request("plusOne", e, Integer.class))
+                .reduce((i, j) -> i + j)
+                .blockingGet();
+
+        assertNotNull(sum);
+        assertEquals(9, sum.intValue());
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:data")
+                        .setBody().constant("123");
+
+                from("reactive-streams:plusOne")
+                        .setBody().body(Integer.class, b -> b + 1)
+                        .log("Hello ${body}");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 0301757..5288263 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -104,6 +104,16 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
 
     }
 
+    @Override
+    public Publisher<Exchange> request(String name, Object data) {
+        return null;
+    }
+
+    @Override
+    public <T> Publisher<T> request(String name, Object data, Class<T> type) {
+        return null;
+    }
+
     public String getName() {
         return name;
     }