You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/01/30 14:39:42 UTC

camel git commit: CAMEL-10612: adding platform tests with reactor-core and refactoring

Repository: camel
Updated Branches:
  refs/heads/master bc6a1b422 -> 20faf979c


CAMEL-10612: adding platform tests with reactor-core and refactoring


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/20faf979
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/20faf979
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/20faf979

Branch: refs/heads/master
Commit: 20faf979c7ba54cc95dcbefd1fbe92887d87b656
Parents: bc6a1b4
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Mon Jan 30 15:39:11 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Mon Jan 30 15:39:11 2017 +0100

----------------------------------------------------------------------
 components/camel-reactive-streams/pom.xml       |   6 +
 ...amelPublisherConversionVerificationTest.java |  68 ----------
 .../streams/CamelPublisherVerificationTest.java |  68 ----------
 ...melSubscriberConversionVerificationTest.java |  62 ----------
 .../CamelSubscriberVerificationTest.java        |  66 ----------
 .../platforms/AbstractPlatformTestSupport.java  | 123 +++++++++++++++++++
 .../platforms/ReactorCorePlatformTest.java      |  43 +++++++
 .../streams/platforms/RxJava2PlatformTest.java  |  43 +++++++
 ...amelPublisherConversionVerificationTest.java |  68 ++++++++++
 .../tck/CamelPublisherVerificationTest.java     |  68 ++++++++++
 ...melSubscriberConversionVerificationTest.java |  62 ++++++++++
 .../tck/CamelSubscriberVerificationTest.java    |  66 ++++++++++
 parent/pom.xml                                  |   1 +
 13 files changed, 480 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/pom.xml b/components/camel-reactive-streams/pom.xml
index 2357e70..3a69a48 100644
--- a/components/camel-reactive-streams/pom.xml
+++ b/components/camel-reactive-streams/pom.xml
@@ -61,6 +61,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <version>${reactor-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-test</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelPublisherConversionVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelPublisherConversionVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelPublisherConversionVerificationTest.java
deleted file mode 100644
index a3b1c6b..0000000
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelPublisherConversionVerificationTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.reactive.streams;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.tck.PublisherVerification;
-import org.reactivestreams.tck.TestEnvironment;
-
-public class CamelPublisherConversionVerificationTest extends PublisherVerification<Long> {
-
-    public CamelPublisherConversionVerificationTest() {
-        super(new TestEnvironment(2000L));
-    }
-
-    @Override
-    public Publisher<Long> createPublisher(long l) {
-
-        CamelContext context = new DefaultCamelContext();
-        RouteBuilder builder = new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("timer:tick?delay=500&period=50&repeatCount=" + l)
-                        .setBody().simple("random(1000)")
-                        .to("reactive-streams:prod");
-            }
-        };
-
-        Publisher<Long> pub = CamelReactiveStreams.get(context).getPublisher("prod", Long.class);
-
-        try {
-            builder.addRoutesToCamelContext(context);
-            context.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        return pub;
-    }
-
-    @Override
-    public long maxElementsFromPublisher() {
-        // It's an active publisher
-        return publisherUnableToSignalOnComplete(); // == Long.MAX_VALUE == unbounded
-    }
-
-    @Override
-    public Publisher<Long> createFailedPublisher() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelPublisherVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelPublisherVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelPublisherVerificationTest.java
deleted file mode 100644
index a29e05e..0000000
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelPublisherVerificationTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.reactive.streams;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.tck.PublisherVerification;
-import org.reactivestreams.tck.TestEnvironment;
-
-public class CamelPublisherVerificationTest extends PublisherVerification<Exchange> {
-
-    public CamelPublisherVerificationTest() {
-        super(new TestEnvironment(2000L));
-    }
-
-    @Override
-    public Publisher<Exchange> createPublisher(long l) {
-
-        CamelContext context = new DefaultCamelContext();
-        RouteBuilder builder = new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("timer:tick?delay=500&period=50&repeatCount=" + l)
-                        .to("reactive-streams:prod");
-            }
-        };
-
-        Publisher<Exchange> pub = CamelReactiveStreams.get(context).getPublisher("prod");
-
-        try {
-            builder.addRoutesToCamelContext(context);
-            context.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        return pub;
-    }
-
-    @Override
-    public long maxElementsFromPublisher() {
-        // It's an active publisher
-        return publisherUnableToSignalOnComplete(); // == Long.MAX_VALUE == unbounded
-    }
-
-    @Override
-    public Publisher<Exchange> createFailedPublisher() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelSubscriberConversionVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelSubscriberConversionVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelSubscriberConversionVerificationTest.java
deleted file mode 100644
index 8f1d157..0000000
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelSubscriberConversionVerificationTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.reactive.streams;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.tck.SubscriberBlackboxVerification;
-import org.reactivestreams.tck.TestEnvironment;
-
-public class CamelSubscriberConversionVerificationTest extends SubscriberBlackboxVerification<Integer> {
-
-    private CamelContext context;
-
-    public CamelSubscriberConversionVerificationTest() {
-        super(new TestEnvironment(2000L));
-    }
-
-    @Override
-    public Subscriber<Integer> createSubscriber() {
-        this.context = new DefaultCamelContext();
-        RouteBuilder builder = new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("reactive-streams:sub?maxInflightExchanges=20")
-                        .to("log:INFO");
-            }
-        };
-
-        Subscriber<Integer> sub = CamelReactiveStreams.get(context).getSubscriber("sub", Integer.class);
-
-        try {
-            builder.addRoutesToCamelContext(context);
-            context.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        return sub;
-    }
-
-    @Override
-    public Integer createElement(int element) {
-        return element;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelSubscriberVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelSubscriberVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelSubscriberVerificationTest.java
deleted file mode 100644
index 921f468..0000000
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelSubscriberVerificationTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.reactive.streams;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultExchange;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.tck.SubscriberBlackboxVerification;
-import org.reactivestreams.tck.TestEnvironment;
-
-public class CamelSubscriberVerificationTest extends SubscriberBlackboxVerification<Exchange> {
-
-    private CamelContext context;
-
-    public CamelSubscriberVerificationTest() {
-        super(new TestEnvironment(2000L));
-    }
-
-    @Override
-    public Subscriber<Exchange> createSubscriber() {
-        this.context = new DefaultCamelContext();
-        RouteBuilder builder = new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("reactive-streams:sub?maxInflightExchanges=20")
-                        .to("log:INFO");
-            }
-        };
-
-        Subscriber<Exchange> sub = CamelReactiveStreams.get(context).getSubscriber("sub");
-
-        try {
-            builder.addRoutesToCamelContext(context);
-            context.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        return sub;
-    }
-
-    @Override
-    public Exchange createElement(int element) {
-        Exchange exchange = new DefaultExchange(context);
-        exchange.getIn().setBody(element);
-        return exchange;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java
new file mode 100644
index 0000000..b7ff7ff
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.platforms;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+/**
+ *
+ */
+public abstract class AbstractPlatformTestSupport extends CamelTestSupport {
+
+    @Test
+    public void testPublisher() throws Exception {
+
+        int num = 20;
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:endpoint")
+                        .to("reactive-streams:integers");
+            }
+        }.addRoutesToCamelContext(context);
+
+        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+        List<Integer> elements = new LinkedList<>();
+        CountDownLatch latch = new CountDownLatch(num);
+
+        this.changeSign(camel.getPublisher("integers", Integer.class), i -> {
+            elements.add(i);
+            latch.countDown();
+        });
+
+        context.start();
+
+        for (int i = 1; i <= num; i++) {
+            template.sendBody("direct:endpoint", i);
+        }
+
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+        for (Integer number : elements) {
+            assertTrue(number < 0);
+        }
+
+    }
+
+
+    @Test
+    public void testSubscriber() throws Exception {
+
+        int num = 20;
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:integers")
+                        .to("mock:endpoint");
+            }
+        }.addRoutesToCamelContext(context);
+
+        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+        List<Integer> elements = new LinkedList<>();
+        for (int i = 1; i <= num; i++) {
+            elements.add(i);
+        }
+
+        changeSign(elements, camel.getSubscriber("integers", Integer.class));
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(num);
+        mock.assertIsSatisfied();
+
+        for (Exchange ex : mock.getExchanges()) {
+            Integer number = ex.getIn().getBody(Integer.class);
+            assertNotNull(number);
+            assertTrue(number < 0);
+        }
+
+    }
+
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    protected abstract void changeSign(Publisher<Integer> data, Consumer<Integer> consume);
+
+    protected abstract void changeSign(Iterable<Integer> data, Subscriber<Integer> camel);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/ReactorCorePlatformTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/ReactorCorePlatformTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/ReactorCorePlatformTest.java
new file mode 100644
index 0000000..984bb98
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/ReactorCorePlatformTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.platforms;
+
+import java.util.function.Consumer;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+import reactor.core.publisher.Flux;
+
+
+public class ReactorCorePlatformTest extends AbstractPlatformTestSupport {
+
+    @Override
+    protected void changeSign(Publisher<Integer> data, Consumer<Integer> consume) {
+        Flux.from(data)
+                .map(i -> -i)
+                .doOnNext(consume)
+                .subscribe();
+    }
+
+    @Override
+    protected void changeSign(Iterable<Integer> data, Subscriber<Integer> camel) {
+        Flux.fromIterable(data)
+                .map(i -> -i)
+                .subscribe(camel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/RxJava2PlatformTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/RxJava2PlatformTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/RxJava2PlatformTest.java
new file mode 100644
index 0000000..da68144
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/RxJava2PlatformTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.platforms;
+
+import java.util.function.Consumer;
+
+import io.reactivex.Flowable;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+
+public class RxJava2PlatformTest extends AbstractPlatformTestSupport {
+
+    @Override
+    protected void changeSign(Publisher<Integer> data, Consumer<Integer> consume) {
+        Flowable.fromPublisher(data)
+                .map(i -> -i)
+                .doOnNext(consume::accept)
+                .subscribe();
+    }
+
+    @Override
+    protected void changeSign(Iterable<Integer> data, Subscriber<Integer> camel) {
+        Flowable.fromIterable(data)
+                .map(i -> -i)
+                .subscribe(camel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java
new file mode 100644
index 0000000..3e3c4ae
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.tck;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.tck.PublisherVerification;
+import org.reactivestreams.tck.TestEnvironment;
+
+public class CamelPublisherConversionVerificationTest extends PublisherVerification<Long> {
+
+    public CamelPublisherConversionVerificationTest() {
+        super(new TestEnvironment(2000L));
+    }
+
+    @Override
+    public Publisher<Long> createPublisher(long l) {
+
+        CamelContext context = new DefaultCamelContext();
+        RouteBuilder builder = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:tick?delay=500&period=50&repeatCount=" + l)
+                        .setBody().simple("random(1000)")
+                        .to("reactive-streams:prod");
+            }
+        };
+
+        Publisher<Long> pub = CamelReactiveStreams.get(context).getPublisher("prod", Long.class);
+
+        try {
+            builder.addRoutesToCamelContext(context);
+            context.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return pub;
+    }
+
+    @Override
+    public long maxElementsFromPublisher() {
+        // It's an active publisher
+        return publisherUnableToSignalOnComplete(); // == Long.MAX_VALUE == unbounded
+    }
+
+    @Override
+    public Publisher<Long> createFailedPublisher() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java
new file mode 100644
index 0000000..72ce04b
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.tck;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.tck.PublisherVerification;
+import org.reactivestreams.tck.TestEnvironment;
+
+public class CamelPublisherVerificationTest extends PublisherVerification<Exchange> {
+
+    public CamelPublisherVerificationTest() {
+        super(new TestEnvironment(2000L));
+    }
+
+    @Override
+    public Publisher<Exchange> createPublisher(long l) {
+
+        CamelContext context = new DefaultCamelContext();
+        RouteBuilder builder = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:tick?delay=500&period=50&repeatCount=" + l)
+                        .to("reactive-streams:prod");
+            }
+        };
+
+        Publisher<Exchange> pub = CamelReactiveStreams.get(context).getPublisher("prod");
+
+        try {
+            builder.addRoutesToCamelContext(context);
+            context.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return pub;
+    }
+
+    @Override
+    public long maxElementsFromPublisher() {
+        // It's an active publisher
+        return publisherUnableToSignalOnComplete(); // == Long.MAX_VALUE == unbounded
+    }
+
+    @Override
+    public Publisher<Exchange> createFailedPublisher() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java
new file mode 100644
index 0000000..b36417c
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.tck;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.tck.SubscriberBlackboxVerification;
+import org.reactivestreams.tck.TestEnvironment;
+
+public class CamelSubscriberConversionVerificationTest extends SubscriberBlackboxVerification<Integer> {
+
+    private CamelContext context;
+
+    public CamelSubscriberConversionVerificationTest() {
+        super(new TestEnvironment(2000L));
+    }
+
+    @Override
+    public Subscriber<Integer> createSubscriber() {
+        this.context = new DefaultCamelContext();
+        RouteBuilder builder = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:sub?maxInflightExchanges=20")
+                        .to("log:INFO");
+            }
+        };
+
+        Subscriber<Integer> sub = CamelReactiveStreams.get(context).getSubscriber("sub", Integer.class);
+
+        try {
+            builder.addRoutesToCamelContext(context);
+            context.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return sub;
+    }
+
+    @Override
+    public Integer createElement(int element) {
+        return element;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java
new file mode 100644
index 0000000..2452f7d
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.tck;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.tck.SubscriberBlackboxVerification;
+import org.reactivestreams.tck.TestEnvironment;
+
+public class CamelSubscriberVerificationTest extends SubscriberBlackboxVerification<Exchange> {
+
+    private CamelContext context;
+
+    public CamelSubscriberVerificationTest() {
+        super(new TestEnvironment(2000L));
+    }
+
+    @Override
+    public Subscriber<Exchange> createSubscriber() {
+        this.context = new DefaultCamelContext();
+        RouteBuilder builder = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:sub?maxInflightExchanges=20")
+                        .to("log:INFO");
+            }
+        };
+
+        Subscriber<Exchange> sub = CamelReactiveStreams.get(context).getSubscriber("sub");
+
+        try {
+            builder.addRoutesToCamelContext(context);
+            context.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return sub;
+    }
+
+    @Override
+    public Exchange createElement(int element) {
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody(element);
+        return exchange;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/20faf979/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 08f0454..8e5118d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -504,6 +504,7 @@
     <quartz2-version>2.2.3</quartz2-version>
     <quickfixj-version>1.6.3</quickfixj-version>
     <rabbitmq-amqp-client-version>4.0.2</rabbitmq-amqp-client-version>
+    <reactor-version>3.0.4.RELEASE</reactor-version>
     <reflections-bundle-version>0.9.10_3</reflections-bundle-version>
     <regexp-bundle-version>1.4_1</regexp-bundle-version>
     <rest-assured-version>3.0.1</rest-assured-version>