You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/01/23 14:58:13 UTC

[camel-quarkus] branch master updated: Create a Camel ReactiveStreams extension #304

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/master by this push:
     new fc29217  Create a Camel ReactiveStreams extension #304
fc29217 is described below

commit fc29217818caa3d6a477252e5850bf197d05f4ae
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Thu Jan 23 11:04:03 2020 +0100

    Create a Camel ReactiveStreams extension #304
---
 .../pages/list-of-camel-quarkus-extensions.adoc    |   5 +-
 extensions/pom.xml                                 |   1 +
 extensions/reactive-streams/deployment/pom.xml     |  75 ++++++++++++
 .../deployment/ReactiveStreamsProcessor.java       |  92 +++++++++++++++
 .../ReactiveStreamsServiceFactoryBuildItem.java    |  36 ++++++
 extensions/reactive-streams/pom.xml                |  39 +++++++
 extensions/reactive-streams/runtime/pom.xml        |  82 ++++++++++++++
 .../reactive/streams/ReactiveStreamsProducers.java |  51 +++++++++
 .../reactive/streams/ReactiveStreamsRecorder.java  |  91 +++++++++++++++
 .../main/resources/META-INF/quarkus-extension.yaml |  27 +++++
 extensions/readme.adoc                             |   5 +-
 integration-tests/pom.xml                          |   1 +
 integration-tests/reactive-streams/pom.xml         | 126 +++++++++++++++++++++
 .../streams/it/ReactiveStreamsResource.java        |  94 +++++++++++++++
 .../reactive/streams/it/ReactiveStreamsRoute.java  |  29 +++++
 .../streams/it/support/TestSubscriber.java         |  77 +++++++++++++
 .../src/main/resources/application.properties      |  32 ++++++
 .../reactive/streams/it/ReactiveStreamsIT.java     |  24 ++++
 .../reactive/streams/it/ReactiveStreamsTest.java   |  62 ++++++++++
 poms/bom-deployment/pom.xml                        |   5 +
 poms/bom/pom.xml                                   |  10 ++
 21 files changed, 962 insertions(+), 2 deletions(-)

diff --git a/docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc b/docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc
index 7c22ea0..998cb7c 100644
--- a/docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc
+++ b/docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc
@@ -15,7 +15,7 @@ TIP: In case you are missing some Camel feature in the list:
 == Camel Components
 
 // components: START
-Number of Camel components: 60 in 51 JAR artifacts (0 deprecated)
+Number of Camel components: 61 in 52 JAR artifacts (0 deprecated)
 
 [width="100%",cols="4,1,5",options="header"]
 |===
@@ -141,6 +141,9 @@ Number of Camel components: 60 in 51 JAR artifacts (0 deprecated)
 | xref:extensions/platform-http.adoc[Platform HTTP] (camel-quarkus-platform-http) +
 `platform-http:path` | 0.3.0 | HTTP service leveraging existing runtime platform HTTP server
 
+| link:https://camel.apache.org/components/latest/reactive-streams-component.html[Reactive Streams] (camel-quarkus-reactive-streams) +
+`reactive-streams:stream` | 1.0.0 | Reactive Camel using reactive streams
+
 | link:https://camel.apache.org/components/latest/rest-component.html[REST] (camel-quarkus-rest) +
 `rest:method:path:uriTemplate` | 0.2.0 | The rest component is used for either hosting REST services (consumer) or calling external REST services (producer).
 
diff --git a/extensions/pom.xml b/extensions/pom.xml
index f17123b..181534c 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -90,6 +90,7 @@
         <module>paho</module>
         <module>pdf</module>
         <module>platform-http</module>
+        <module>reactive-streams</module>
         <module>rest</module>
         <module>salesforce</module>
         <module>scheduler</module>
diff --git a/extensions/reactive-streams/deployment/pom.xml b/extensions/reactive-streams/deployment/pom.xml
new file mode 100644
index 0000000..2516630
--- /dev/null
+++ b/extensions/reactive-streams/deployment/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.camel.quarkus</groupId>
+        <artifactId>camel-quarkus-reactive-streams-parent</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>camel-quarkus-reactive-streams-deployment</artifactId>
+    <name>Camel Quarkus :: Reactive Streams :: Deployment</name>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.camel.quarkus</groupId>
+                <artifactId>camel-quarkus-bom-deployment</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-core-deployment</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-reactive-streams</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <annotationProcessorPaths>
+                        <path>
+                            <groupId>io.quarkus</groupId>
+                            <artifactId>quarkus-extension-processor</artifactId>
+                            <version>${quarkus.version}</version>
+                        </path>
+                    </annotationProcessorPaths>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsProcessor.java b/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsProcessor.java
new file mode 100644
index 0000000..e5de2a4
--- /dev/null
+++ b/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsProcessor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.deployment;
+
+import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
+import io.quarkus.arc.deployment.BeanContainerBuildItem;
+import io.quarkus.deployment.annotations.BuildProducer;
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.annotations.ExecutionTime;
+import io.quarkus.deployment.annotations.Overridable;
+import io.quarkus.deployment.annotations.Record;
+import io.quarkus.deployment.builditem.FeatureBuildItem;
+import org.apache.camel.quarkus.component.reactive.streams.ReactiveStreamsProducers;
+import org.apache.camel.quarkus.component.reactive.streams.ReactiveStreamsRecorder;
+import org.apache.camel.quarkus.core.Flags;
+import org.apache.camel.quarkus.core.deployment.CamelBeanBuildItem;
+import org.apache.camel.quarkus.core.deployment.CamelContextBuildItem;
+import org.apache.camel.quarkus.core.deployment.CamelServiceFilter;
+import org.apache.camel.quarkus.core.deployment.CamelServiceFilterBuildItem;
+
+class ReactiveStreamsProcessor {
+    private static final String SCHEME = "reactive-streams";
+    private static final String FEATURE = "camel-reactive-streams";
+
+    @BuildStep
+    FeatureBuildItem feature() {
+        return new FeatureBuildItem(FEATURE);
+    }
+
+    @BuildStep
+    CamelServiceFilterBuildItem serviceFilter() {
+        return new CamelServiceFilterBuildItem(CamelServiceFilter.forComponent(SCHEME));
+    }
+
+    @BuildStep(onlyIf = Flags.MainEnabled.class)
+    void beans(BuildProducer<AdditionalBeanBuildItem> beanProducer) {
+        // thi extension will made some reactive camel reactive streams object availbale
+        // for injection in order to easy the use CamelReactiveStreams in CDI.
+        //
+        // For more info about what object are published, have a look at
+        //     org.apache.camel.quarkus.component.reactive.streamsReactiveStreamsProducers
+        beanProducer.produce(AdditionalBeanBuildItem.unremovableOf(ReactiveStreamsProducers.class));
+    }
+
+    @Overridable
+    @BuildStep
+    @Record(value = ExecutionTime.STATIC_INIT, optional = true)
+    public ReactiveStreamsServiceFactoryBuildItem defaultReactiveStreamsServiceFactory(
+            ReactiveStreamsRecorder recorder) {
+        return new ReactiveStreamsServiceFactoryBuildItem(recorder.createDefaultReactiveStreamsServiceFactory());
+    }
+
+    @Record(ExecutionTime.STATIC_INIT)
+    @BuildStep
+    CamelBeanBuildItem reactiveStreamsComponent(
+            ReactiveStreamsRecorder recorder,
+            ReactiveStreamsServiceFactoryBuildItem reactiveStreamsServiceFactory) {
+
+        return new CamelBeanBuildItem(
+                SCHEME,
+                "org.apache.camel.component.reactive.streams.ReactiveStreamsComponent",
+                recorder.createReactiveStreamsComponent(reactiveStreamsServiceFactory.getValue()));
+    }
+
+    @Record(ExecutionTime.STATIC_INIT)
+    @BuildStep
+    void publishCamelReactiveStreamsService(
+            BeanContainerBuildItem beanContainer,
+            ReactiveStreamsRecorder recorder,
+            CamelContextBuildItem camelContext,
+            ReactiveStreamsServiceFactoryBuildItem reactiveStreamsServiceFactory) {
+
+        recorder.publishCamelReactiveStreamsService(
+                beanContainer.getValue(),
+                camelContext.getCamelContext(),
+                reactiveStreamsServiceFactory.getValue());
+    }
+}
diff --git a/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsServiceFactoryBuildItem.java b/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsServiceFactoryBuildItem.java
new file mode 100644
index 0000000..f625d77
--- /dev/null
+++ b/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsServiceFactoryBuildItem.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.deployment;
+
+import io.quarkus.builder.item.SimpleBuildItem;
+import io.quarkus.runtime.RuntimeValue;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServiceFactory;
+
+/**
+ * Holder for a {@link CamelReactiveStreamsServiceFactory} instance.
+ */
+final class ReactiveStreamsServiceFactoryBuildItem extends SimpleBuildItem {
+    private final RuntimeValue<CamelReactiveStreamsServiceFactory> value;
+
+    public ReactiveStreamsServiceFactoryBuildItem(RuntimeValue<CamelReactiveStreamsServiceFactory> value) {
+        this.value = value;
+    }
+
+    public RuntimeValue<CamelReactiveStreamsServiceFactory> getValue() {
+        return value;
+    }
+}
diff --git a/extensions/reactive-streams/pom.xml b/extensions/reactive-streams/pom.xml
new file mode 100644
index 0000000..fe62d62
--- /dev/null
+++ b/extensions/reactive-streams/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.camel.quarkus</groupId>
+        <artifactId>camel-quarkus-build-parent</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../poms/build-parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>camel-quarkus-reactive-streams-parent</artifactId>
+    <name>Camel Quarkus :: Reactive Streams</name>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>deployment</module>
+        <module>runtime</module>
+    </modules>
+</project>
diff --git a/extensions/reactive-streams/runtime/pom.xml b/extensions/reactive-streams/runtime/pom.xml
new file mode 100644
index 0000000..6b0b151
--- /dev/null
+++ b/extensions/reactive-streams/runtime/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.camel.quarkus</groupId>
+        <artifactId>camel-quarkus-reactive-streams-parent</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>camel-quarkus-reactive-streams</artifactId>
+    <name>Camel Quarkus :: Reactive Streams :: Runtime</name>
+
+    <properties>
+        <firstVersion>1.0.0</firstVersion>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.camel.quarkus</groupId>
+                <artifactId>camel-quarkus-bom</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-reactive-streams</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>io.quarkus</groupId>
+                <artifactId>quarkus-bootstrap-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <annotationProcessorPaths>
+                        <path>
+                            <groupId>io.quarkus</groupId>
+                            <artifactId>quarkus-extension-processor</artifactId>
+                            <version>${quarkus.version}</version>
+                        </path>
+                    </annotationProcessorPaths>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsProducers.java b/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsProducers.java
new file mode 100644
index 0000000..28c217b
--- /dev/null
+++ b/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsProducers.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams;
+
+import javax.enterprise.inject.Produces;
+import javax.inject.Singleton;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServiceFactory;
+
+/**
+ * Producers of CamelReactiveStreams related beans that are injectable via CDI.
+ */
+@Singleton
+public class ReactiveStreamsProducers {
+    private volatile CamelContext camelContext;
+    private volatile CamelReactiveStreamsServiceFactory reactiveStreamsServiceFactory;
+
+    public void init(CamelContext camelContext, CamelReactiveStreamsServiceFactory reactiveStreamsServiceFactory) {
+        this.camelContext = camelContext;
+        this.reactiveStreamsServiceFactory = reactiveStreamsServiceFactory;
+    }
+
+    @Singleton
+    @Produces
+    CamelReactiveStreamsServiceFactory camelReactiveStreamsServiceFactory() {
+        return reactiveStreamsServiceFactory;
+    }
+
+    @Singleton
+    @Produces
+    CamelReactiveStreamsService camelReactiveStreamsService() {
+        return CamelReactiveStreams.get(camelContext);
+    }
+}
diff --git a/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsRecorder.java b/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsRecorder.java
new file mode 100644
index 0000000..2e731d3
--- /dev/null
+++ b/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsRecorder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams;
+
+import io.quarkus.arc.runtime.BeanContainer;
+import io.quarkus.runtime.RuntimeValue;
+import io.quarkus.runtime.annotations.Recorder;
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServiceFactory;
+import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsServiceFactory;
+import org.apache.camel.support.service.ServiceHelper;
+
+@Recorder
+public class ReactiveStreamsRecorder {
+    public RuntimeValue<CamelReactiveStreamsServiceFactory> createDefaultReactiveStreamsServiceFactory() {
+        return new RuntimeValue<>(new DefaultCamelReactiveStreamsServiceFactory());
+    }
+
+    public RuntimeValue<ReactiveStreamsComponent> createReactiveStreamsComponent(
+            RuntimeValue<CamelReactiveStreamsServiceFactory> serviceFactory) {
+        return new RuntimeValue<>(new QuarkusReactiveStreamsComponent(serviceFactory.getValue()));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void publishCamelReactiveStreamsService(
+            BeanContainer beanContainer,
+            RuntimeValue<CamelContext> camelContext,
+            RuntimeValue<CamelReactiveStreamsServiceFactory> serviceFactory) {
+
+        // register to the container
+        beanContainer.instance(ReactiveStreamsProducers.class).init(
+                camelContext.getValue(),
+                serviceFactory.getValue());
+    }
+
+    private static class QuarkusReactiveStreamsComponent extends ReactiveStreamsComponent {
+        private final CamelReactiveStreamsServiceFactory reactiveStreamServiceFactory;
+        private final Object lock;
+        private CamelReactiveStreamsService reactiveStreamService;
+
+        public QuarkusReactiveStreamsComponent(CamelReactiveStreamsServiceFactory reactiveStreamServiceFactory) {
+            this.reactiveStreamServiceFactory = reactiveStreamServiceFactory;
+            this.lock = new Object();
+        }
+
+        @Override
+        public CamelReactiveStreamsService getReactiveStreamsService() {
+            synchronized (this.lock) {
+                if (reactiveStreamService == null) {
+                    this.reactiveStreamService = reactiveStreamServiceFactory.newInstance(
+                            getCamelContext(),
+                            getInternalEngineConfiguration());
+
+                    try {
+                        // Start the service and add it to the Camel context to expose managed attributes
+                        getCamelContext().addService(this.reactiveStreamService, true, true);
+                    } catch (Exception e) {
+                        throw new RuntimeCamelException(e);
+                    }
+                }
+            }
+
+            return this.reactiveStreamService;
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            ServiceHelper.stopService(this.reactiveStreamService);
+            this.reactiveStreamService = null;
+
+            super.doStop();
+        }
+    }
+}
diff --git a/extensions/reactive-streams/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/reactive-streams/runtime/src/main/resources/META-INF/quarkus-extension.yaml
new file mode 100644
index 0000000..c38c2ba
--- /dev/null
+++ b/extensions/reactive-streams/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+---
+name: "Camel Quarkus Reactive Streams"
+description: "Camel Reactive Streams component"
+metadata:
+  keywords:
+  - "camel"
+  - "reactive-streams"
+  guide: "https://quarkus.io/guides/camel"
+  categories:
+  - "integration"
\ No newline at end of file
diff --git a/extensions/readme.adoc b/extensions/readme.adoc
index 9a2f8f3..0793702 100644
--- a/extensions/readme.adoc
+++ b/extensions/readme.adoc
@@ -5,7 +5,7 @@ Apache Camel Quarkus supports the following Camel artifacts as Quarkus Extension
 == Camel Components
 
 // components: START
-Number of Camel components: 60 in 51 JAR artifacts (0 deprecated)
+Number of Camel components: 61 in 52 JAR artifacts (0 deprecated)
 
 [width="100%",cols="4,1,5",options="header"]
 |===
@@ -131,6 +131,9 @@ Number of Camel components: 60 in 51 JAR artifacts (0 deprecated)
 | xref:extensions/platform-http.adoc[Platform HTTP] (camel-quarkus-platform-http) +
 `platform-http:path` | 0.3.0 | HTTP service leveraging existing runtime platform HTTP server
 
+| link:https://camel.apache.org/components/latest/reactive-streams-component.html[Reactive Streams] (camel-quarkus-reactive-streams) +
+`reactive-streams:stream` | 1.0.0 | Reactive Camel using reactive streams
+
 | link:https://camel.apache.org/components/latest/rest-component.html[REST] (camel-quarkus-rest) +
 `rest:method:path:uriTemplate` | 0.2.0 | The rest component is used for either hosting REST services (consumer) or calling external REST services (producer).
 
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 217d8c6..6fe866b 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -186,6 +186,7 @@
         <module>pdf</module>
         <module>platform-http</module>
         <module>platform-http-engine</module>
+        <module>reactive-streams</module>
         <module>salesforce</module>
         <module>scheduler</module>
         <module>seda</module>
diff --git a/integration-tests/reactive-streams/pom.xml b/integration-tests/reactive-streams/pom.xml
new file mode 100644
index 0000000..af1ca05
--- /dev/null
+++ b/integration-tests/reactive-streams/pom.xml
@@ -0,0 +1,126 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.camel.quarkus</groupId>
+        <artifactId>camel-quarkus-integration-tests</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-quarkus-integration-test-reactive-streams</artifactId>
+    <name>Camel Quarkus :: Integration Tests :: Reactive Streams</name>
+    <description>Integration tests for Camel Quarkus Reactive Streams extension</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-reactive-streams</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-direct</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-resteasy-jsonb</artifactId>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-junit5</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.rest-assured</groupId>
+            <artifactId>rest-assured</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>io.quarkus</groupId>
+                <artifactId>quarkus-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>build</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>native</id>
+            <activation>
+                <property>
+                    <name>native</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>integration-test</goal>
+                                    <goal>verify</goal>
+                                </goals>
+                                <configuration>
+                                    <systemProperties>
+                                        <native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
+                                    </systemProperties>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>io.quarkus</groupId>
+                        <artifactId>quarkus-maven-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>native-image</id>
+                                <goals>
+                                    <goal>native-image</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>
diff --git a/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsResource.java b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsResource.java
new file mode 100644
index 0000000..b264dae
--- /dev/null
+++ b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsResource.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.FluentProducerTemplate;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsEndpoint;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServiceFactory;
+import org.apache.camel.quarkus.component.reactive.streams.it.support.TestSubscriber;
+
+@Path("/reactive-streams")
+@ApplicationScoped
+public class ReactiveStreamsResource {
+    @Inject
+    CamelContext camelContext;
+    @Inject
+    FluentProducerTemplate producerTemplate;
+    @Inject
+    CamelReactiveStreamsService reactiveStreamsService;
+    @Inject
+    CamelReactiveStreamsServiceFactory reactiveStreamsServiceFactory;
+
+    @Path("/inspect")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public JsonObject get() {
+        ReactiveStreamsComponent component = camelContext.getComponent("reactive-streams", ReactiveStreamsComponent.class);
+        ReactiveStreamsEndpoint endpoint = camelContext.getEndpointRegistry().values().stream()
+                .filter(ReactiveStreamsEndpoint.class::isInstance)
+                .map(ReactiveStreamsEndpoint.class::cast)
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("Unable to find and endpoint of type ReactiveStreamsEndpoint"));
+
+        return Json.createObjectBuilder()
+                .add("reactive-streams-component-type", component.getClass().getName())
+                .add("reactive-streams-component-backpressure-strategy", component.getBackpressureStrategy().toString())
+                .add("reactive-streams-endpoint-backpressure-strategy", endpoint.getBackpressureStrategy().toString())
+                .add("reactive-streams-service-type", reactiveStreamsService.getClass().getName())
+                .add("reactive-streams-service-factory-type", reactiveStreamsServiceFactory.getClass().getName())
+                .build();
+    }
+
+    @Path("/to-upper")
+    @POST
+    @Produces(MediaType.TEXT_PLAIN)
+    public String toUpper(String payload) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<String> result = new AtomicReference<>();
+
+        TestSubscriber<String> subscriber = TestSubscriber.onNext(data -> {
+            result.set(data);
+            latch.countDown();
+        });
+
+        subscriber.setInitiallyRequested(1);
+        reactiveStreamsService.fromStream("toUpper", String.class).subscribe(subscriber);
+
+        producerTemplate.to("direct:toUpper").withBody(payload).send();
+
+        latch.await(5, TimeUnit.SECONDS);
+
+        return result.get();
+    }
+}
diff --git a/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsRoute.java b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsRoute.java
new file mode 100644
index 0000000..c085a93
--- /dev/null
+++ b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsRoute.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it;
+
+import org.apache.camel.builder.RouteBuilder;
+
+public class ReactiveStreamsRoute extends RouteBuilder {
+    @Override
+    public void configure() throws Exception {
+        from("direct:toUpper")
+                .routeId("toUpper")
+                .setBody().body(String.class, s -> s.toUpperCase())
+                .to("reactive-streams:toUpper?backpressureStrategy=BUFFER");
+    }
+}
diff --git a/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/support/TestSubscriber.java b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/support/TestSubscriber.java
new file mode 100644
index 0000000..53cabe4
--- /dev/null
+++ b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/support/TestSubscriber.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it.support;
+
+import java.util.function.Consumer;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+public class TestSubscriber<T> implements Subscriber<T> {
+    protected Subscription subscription;
+
+    private long initiallyRequested;
+
+    public TestSubscriber() {
+    }
+
+    public static <V> TestSubscriber<V> onNext(Consumer<V> consumer) {
+        return new TestSubscriber<V>() {
+            @Override
+            public void onNext(V data) {
+                consumer.accept(data);
+            }
+        };
+    }
+
+    public long getInitiallyRequested() {
+        return initiallyRequested;
+    }
+
+    public void setInitiallyRequested(long initiallyRequested) {
+        this.initiallyRequested = initiallyRequested;
+    }
+
+    public void request(long exchanges) {
+        this.subscription.request(exchanges);
+    }
+
+    public void cancel() {
+        this.subscription.cancel();
+    }
+
+    @Override
+    public void onSubscribe(Subscription subscription) {
+        this.subscription = subscription;
+
+        if (initiallyRequested > 0) {
+            subscription.request(initiallyRequested);
+        }
+    }
+
+    @Override
+    public void onNext(T t) {
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+    }
+
+    @Override
+    public void onComplete() {
+    }
+}
diff --git a/integration-tests/reactive-streams/src/main/resources/application.properties b/integration-tests/reactive-streams/src/main/resources/application.properties
new file mode 100644
index 0000000..0943243
--- /dev/null
+++ b/integration-tests/reactive-streams/src/main/resources/application.properties
@@ -0,0 +1,32 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# Quarkus
+#
+quarkus.log.file.enable = false
+
+#
+# Camel
+#
+camel.context.name = quarkus-camel-example
+
+#
+# Camel :: Reactive Streams
+#
+
+camel.component.reactive-streams.backpressure-strategy = LATEST
\ No newline at end of file
diff --git a/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsIT.java b/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsIT.java
new file mode 100644
index 0000000..2f25623
--- /dev/null
+++ b/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsIT.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it;
+
+import io.quarkus.test.junit.NativeImageTest;
+
+@NativeImageTest
+class ReactiveStreamsIT extends ReactiveStreamsTest {
+
+}
diff --git a/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsTest.java b/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsTest.java
new file mode 100644
index 0000000..702be1d
--- /dev/null
+++ b/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it;
+
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.RestAssured;
+import io.restassured.path.json.JsonPath;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@QuarkusTest
+class ReactiveStreamsTest {
+    @Test
+    public void reactiveStreamsService() {
+        JsonPath result = RestAssured.get("/reactive-streams/inspect")
+                .then()
+                .statusCode(200)
+                .extract()
+                .body()
+                .jsonPath();
+
+        assertThat(result.getString("reactive-streams-component-type")).isEqualTo(
+                "org.apache.camel.quarkus.component.reactive.streams.ReactiveStreamsRecorder$QuarkusReactiveStreamsComponent");
+        assertThat(result.getString("reactive-streams-component-backpressure-strategy")).isEqualTo(
+                "LATEST");
+        assertThat(result.getString("reactive-streams-endpoint-backpressure-strategy")).isEqualTo(
+                "BUFFER");
+        assertThat(result.getString("reactive-streams-service-type")).isEqualTo(
+                "org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService");
+        assertThat(result.getString("reactive-streams-service-factory-type")).isEqualTo(
+                "org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsServiceFactory");
+    }
+
+    @Test
+    public void subscriber() {
+        final String payload = "test";
+
+        RestAssured.given()
+                .body(payload)
+                .post("/reactive-streams/to-upper")
+                .then()
+                .statusCode(200)
+                .body(is(payload.toUpperCase()));
+    }
+
+}
diff --git a/poms/bom-deployment/pom.xml b/poms/bom-deployment/pom.xml
index 1f53a45..e258e26 100644
--- a/poms/bom-deployment/pom.xml
+++ b/poms/bom-deployment/pom.xml
@@ -321,6 +321,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.camel.quarkus</groupId>
+                <artifactId>camel-quarkus-reactive-streams-deployment</artifactId>
+                <version>${camel-quarkus.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.camel.quarkus</groupId>
                 <artifactId>camel-quarkus-rest-deployment</artifactId>
                 <version>${camel-quarkus.version}</version>
             </dependency>
diff --git a/poms/bom/pom.xml b/poms/bom/pom.xml
index a0aa572..ddcef6b 100644
--- a/poms/bom/pom.xml
+++ b/poms/bom/pom.xml
@@ -393,6 +393,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.camel</groupId>
+                <artifactId>camel-reactive-streams</artifactId>
+                <version>${camel.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.camel</groupId>
                 <artifactId>camel-rest</artifactId>
                 <version>${camel.version}</version>
             </dependency>
@@ -766,6 +771,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.camel.quarkus</groupId>
+                <artifactId>camel-quarkus-reactive-streams</artifactId>
+                <version>${camel-quarkus.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.camel.quarkus</groupId>
                 <artifactId>camel-quarkus-rest</artifactId>
                 <version>${camel-quarkus.version}</version>
             </dependency>