You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/02 16:06:42 UTC

[2/2] camel git commit: CAMEL-10612: allow a stream to request data to Camel

CAMEL-10612: allow a stream to request data to Camel


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5108cb51
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5108cb51
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5108cb51

Branch: refs/heads/master
Commit: 5108cb512f326ff690337f9e1fcfdaffa089601b
Parents: 1885db2
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Thu Feb 2 10:58:14 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Thu Feb 2 17:06:20 2017 +0100

----------------------------------------------------------------------
 .../api/CamelReactiveStreamsService.java        |  22 ++
 .../engine/CamelReactiveStreamsServiceImpl.java |  55 +++-
 .../streams/engine/CamelSubscriber.java         |   5 +
 .../streams/engine/DelayedMonoPublisher.java    | 188 ++++++++++++++
 .../streams/DelayedMonoPublisherTest.java       | 257 +++++++++++++++++++
 .../reactive/streams/ExchangeRequestTest.java   |  89 +++++++
 .../support/ReactiveStreamsTestService.java     |  10 +
 7 files changed, 625 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index b865a48..3a7acd4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -77,6 +77,28 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      */
     <T> Subscriber<T> getSubscriber(String name, Class<T> type);
 
+    /**
+     * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
+     * the resulting exchange or an error.
+     *
+     * @param name the stream name
+     * @param data the data to push
+     * @return an publisher with the resulting exchange
+     */
+    Publisher<Exchange> request(String name, Object data);
+
+    /**
+     * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
+     * the exchange output or an error.
+     *
+     * @param name the stream name
+     * @param data the data to push
+     * @param type  the type to which the output should be converted
+     * @param <T> the generic type of the resulting Publisher
+     * @return an publisher with the resulting data
+     */
+    <T> Publisher<T> request(String name, Object data, Class<T> type);
+
     /*
      * Methods for Camel producers.
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 7b285a6..edffb9e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
@@ -29,10 +30,14 @@ import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServi
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
 import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.spi.Synchronization;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
-
+/**
+ * The default implementation of the reactive streams service.
+ */
 public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsService {
 
     private CamelContext context;
@@ -100,6 +105,54 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
         getPayloadPublisher(name).publish(payload);
     }
 
+    @Override
+    public Publisher<Exchange> request(String name, Object data) {
+        Exchange exchange;
+        if (data instanceof Exchange) {
+            exchange = (Exchange) data;
+        } else {
+            exchange = new DefaultExchange(context);
+            exchange.setPattern(ExchangePattern.InOut);
+            exchange.getIn().setBody(data);
+        }
+
+        return doRequest(name, exchange);
+    }
+
+    @Override
+    public <T> Publisher<T> request(String name, Object data, Class<T> type) {
+        return new ConvertingPublisher<>(request(name, data), type);
+    }
+
+    protected Publisher<Exchange> doRequest(String name, Exchange data) {
+        ReactiveStreamsConsumer consumer = getSubscriber(name).getConsumer();
+        if (consumer == null) {
+            throw new IllegalStateException("No consumers attached to the stream " + name);
+        }
+
+        DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool);
+
+        data.addOnCompletion(new Synchronization() {
+            @Override
+            public void onComplete(Exchange exchange) {
+                publisher.setData(exchange);
+            }
+
+            @Override
+            public void onFailure(Exchange exchange) {
+                Throwable throwable = exchange.getException();
+                if (throwable == null) {
+                    throwable = new IllegalStateException("Unknown Exception");
+                }
+                publisher.setException(throwable);
+            }
+        });
+
+        consumer.process(data, doneSync -> {
+        });
+
+        return publisher;
+    }
 
     private CamelPublisher getPayloadPublisher(String name) {
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index eda2863..098d0ca 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -64,6 +64,10 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
         refill();
     }
 
+    public synchronized ReactiveStreamsConsumer getConsumer() {
+        return consumer;
+    }
+
     public void detachConsumer() {
         synchronized (this) {
             this.consumer = null;
@@ -86,6 +90,7 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
         }
 
         if (!allowed) {
+            LOG.warn("There is another active subscription: cancelled");
             subscription.cancel();
         } else {
             refill();

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
new file mode 100644
index 0000000..ef8ece1
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.engine;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Publish a single item as soon as it's available.
+ */
+public class DelayedMonoPublisher<T> implements Publisher<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedMonoPublisher.class);
+
+    private ExecutorService workerPool;
+
+    private volatile T data;
+
+    private volatile Throwable exception;
+
+    private List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>();
+
+    private AtomicBoolean flushing = new AtomicBoolean(false);
+
+    public DelayedMonoPublisher(ExecutorService workerPool) {
+        this.workerPool = workerPool;
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> subscriber) {
+        Objects.requireNonNull(subscriber, "subscriber must not be null");
+        MonoSubscription sub = new MonoSubscription(subscriber);
+        subscriptions.add(sub);
+        subscriber.onSubscribe(sub);
+        flushCycle();
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        Objects.requireNonNull(data, "data must be not null");
+        if (this.data != null) {
+            throw new IllegalStateException("data has already been set");
+        } else if (this.exception != null) {
+            throw new IllegalStateException("an exception has already been set");
+        }
+
+        this.data = data;
+        flushCycle();
+    }
+
+    public Throwable getException() {
+        return exception;
+    }
+
+    public void setException(Throwable exception) {
+        Objects.requireNonNull(exception, "exception must be not null");
+        if (this.data != null) {
+            throw new IllegalStateException("data has already been set");
+        } else if (this.exception != null) {
+            throw new IllegalStateException("an exception has already been set");
+        }
+
+        this.exception = exception;
+        flushCycle();
+    }
+
+    private void flushCycle() {
+        boolean notRunning = flushing.compareAndSet(false, true);
+
+        if (notRunning) {
+            workerPool.execute(() -> {
+                try {
+                    List<MonoSubscription> completed = new LinkedList<>();
+                    for (MonoSubscription sub : this.subscriptions) {
+                        sub.flush();
+                        if (sub.isTerminated()) {
+                            completed.add(sub);
+                        }
+                    }
+                    this.subscriptions.removeAll(completed);
+                } finally {
+                    flushing.set(false);
+                }
+
+                boolean runAgain = false;
+                for (MonoSubscription sub : this.subscriptions) {
+                    if (sub.isReady()) {
+                        runAgain = true;
+                        break;
+                    }
+                }
+                if (runAgain) {
+                    flushCycle();
+                }
+
+            });
+        }
+    }
+
+    private final class MonoSubscription implements Subscription {
+
+        private volatile boolean terminated;
+
+        private volatile boolean requested;
+
+        private Subscriber<? super T> subscriber;
+
+        private MonoSubscription(Subscriber<? super T> subscriber) {
+            this.subscriber = subscriber;
+        }
+
+        @Override
+        public void request(long l) {
+            if (terminated) {
+                throw new IllegalStateException("The subscription is terminated");
+            }
+
+            if (l <= 0) {
+                subscriber.onError(new IllegalArgumentException("3.9"));
+                synchronized (this) {
+                    terminated = true;
+                }
+            } else {
+                synchronized (this) {
+                    requested = true;
+                }
+            }
+
+        }
+
+        public void flush() {
+            synchronized (this) {
+                if (!isReady()) {
+                    return;
+                }
+
+                terminated = true;
+            }
+
+            if (data != null) {
+                subscriber.onNext(data);
+                subscriber.onComplete();
+            } else {
+                subscriber.onError(exception);
+            }
+        }
+
+        public boolean isTerminated() {
+            return terminated;
+        }
+
+        public boolean isReady() {
+            return !terminated && requested && (data != null || exception != null);
+        }
+
+        @Override
+        public synchronized void cancel() {
+            terminated = true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
new file mode 100644
index 0000000..011225e
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.component.reactive.streams.engine.DelayedMonoPublisher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class DelayedMonoPublisherTest {
+
+    private ExecutorService service;
+
+    @Before
+    public void init() {
+        service = new ScheduledThreadPoolExecutor(3);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        service.shutdown();
+        service.awaitTermination(1, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testAlreadyAvailable() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setData(5);
+
+        LinkedList<Integer> data = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(1, data.size());
+        assertEquals(5, data.get(0).intValue());
+    }
+
+    @Test
+    public void testExceptionAlreadyAvailable() throws Exception {
+
+        Exception ex = new RuntimeException("An exception");
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setException(ex);
+
+        LinkedList<Throwable> exceptions = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Flowable.fromPublisher(pub)
+                .doOnError(exceptions::add)
+                .doOnError(e -> latch.countDown())
+                .subscribe();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(1, exceptions.size());
+        assertEquals(ex, exceptions.get(0));
+    }
+
+    @Test
+    public void testAvailableSoon() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        LinkedList<Integer> data = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Thread.yield();
+        pub.setData(5);
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(1, data.size());
+        assertEquals(5, data.get(0).intValue());
+    }
+
+    @Test
+    public void testAvailableLater() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        LinkedList<Integer> data = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Thread.sleep(200);
+        pub.setData(5);
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(1, data.size());
+        assertEquals(5, data.get(0).intValue());
+    }
+
+    @Test
+    public void testMultipleSubscribers() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        ConcurrentLinkedDeque<Integer> data = new ConcurrentLinkedDeque<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Thread.sleep(200);
+        pub.setData(5);
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(2, data.size());
+        for (Integer n : data) {
+            assertEquals(5, n.intValue());
+        }
+    }
+
+    @Test
+    public void testMultipleSubscribersMixedArrival() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        ConcurrentLinkedDeque<Integer> data = new ConcurrentLinkedDeque<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        Thread.sleep(200);
+        pub.setData(5);
+
+        Flowable.fromPublisher(pub)
+                .doOnNext(data::add)
+                .doOnComplete(latch::countDown)
+                .subscribe();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(2, data.size());
+        for (Integer n : data) {
+            assertEquals(5, n.intValue());
+        }
+    }
+
+    @Test
+    public void testMultipleSubscribersMixedArrivalException() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+        Exception ex = new RuntimeException("An exception");
+
+        ConcurrentLinkedDeque<Throwable> exceptions = new ConcurrentLinkedDeque<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Flowable.fromPublisher(pub)
+                .doOnError(exceptions::add)
+                .doOnError(e -> latch.countDown())
+                .subscribe();
+
+        Thread.sleep(200);
+        pub.setException(ex);
+
+        Flowable.fromPublisher(pub)
+                .doOnError(exceptions::add)
+                .doOnError(e -> latch.countDown())
+                .subscribe();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+        assertEquals(2, exceptions.size());
+        for (Throwable t : exceptions) {
+            assertEquals(ex, t);
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDataOrExceptionAllowed() throws Exception {
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        Exception ex = new RuntimeException("An exception");
+        pub.setException(ex);
+        pub.setData(1);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDataOrExceptionAllowed2() throws Exception {
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setData(1);
+        Exception ex = new RuntimeException("An exception");
+        pub.setException(ex);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testOnlyOneDataAllowed() throws Exception {
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setData(1);
+        pub.setData(2);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testOnlyOneExceptionAllowed() throws Exception {
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setException(new RuntimeException("An exception"));
+        pub.setException(new RuntimeException("An exception"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
new file mode 100644
index 0000000..4e96ac6
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+
+public class ExchangeRequestTest extends CamelTestSupport {
+
+    @Test
+    public void testStreamRequest() throws Exception {
+
+        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+        Publisher<Exchange> string = camel.request("data", new DefaultExchange(context));
+
+        Exchange res = Flowable.fromPublisher(string).blockingFirst();
+
+        assertNotNull(res);
+
+        String content = res.getIn().getBody(String.class);
+        assertNotNull(content);
+        assertEquals("123", content);
+    }
+
+    @Test
+    public void testInteraction() throws Exception {
+
+        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+        Integer res = Flowable.fromPublisher(camel.request("plusOne", 1L, Integer.class))
+                .blockingFirst();
+
+        assertNotNull(res);
+        assertEquals(2, res.intValue());
+    }
+
+    @Test
+    public void testMultipleInteractions() throws Exception {
+        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+        Integer sum = Flowable.just(1, 2, 3)
+                .flatMap(e -> camel.request("plusOne", e, Integer.class))
+                .reduce((i, j) -> i + j)
+                .blockingGet();
+
+        assertNotNull(sum);
+        assertEquals(9, sum.intValue());
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:data")
+                        .setBody().constant("123");
+
+                from("reactive-streams:plusOne")
+                        .setBody().body(Integer.class, b -> b + 1)
+                        .log("Hello ${body}");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 0301757..5288263 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -104,6 +104,16 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
 
     }
 
+    @Override
+    public Publisher<Exchange> request(String name, Object data) {
+        return null;
+    }
+
+    @Override
+    public <T> Publisher<T> request(String name, Object data, Class<T> type) {
+        return null;
+    }
+
     public String getName() {
         return name;
     }