You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by jo...@apache.org on 2017/11/09 00:51:11 UTC

[cxf] branch master updated: [CXF-7535] Adding client & server support for Project Reactor (#331)

This is an automated email from the ASF dual-hosted git repository.

johndament pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new f634fb8  [CXF-7535] Adding client & server support for Project Reactor (#331)
f634fb8 is described below

commit f634fb80f0029e6986bc99e8b862778bca1145b0
Author: John Ament <jo...@gmail.com>
AuthorDate: Wed Nov 8 19:51:08 2017 -0500

    [CXF-7535] Adding client & server support for Project Reactor (#331)
---
 .gitignore                                         |   1 +
 parent/pom.xml                                     |   6 +
 rt/rs/extensions/{rx => reactor}/pom.xml           |  28 +--
 .../cxf/jaxrs/reactor/client/ReactorInvoker.java   | 120 ++++++++++
 .../jaxrs/reactor/client/ReactorInvokerImpl.java   | 251 +++++++++++++++++++++
 .../reactor/client/ReactorInvokerProvider.java     |  38 ++++
 .../cxf/jaxrs/reactor/client/ReactorUtils.java     |  47 ++++
 .../jaxrs/reactor/server/AbstractSubscriber.java   |  86 +++++++
 .../server/JsonStreamingAsyncSubscriber.java       |  37 +++
 .../cxf/jaxrs/reactor/server/ReactorInvoker.java   |  56 +++++
 .../reactor/server/StreamingAsyncSubscriber.java   | 126 +++++++++++
 rt/rs/extensions/rx/pom.xml                        |   1 -
 rt/rs/pom.xml                                      |   1 +
 systests/jaxrs/pom.xml                             |  11 +-
 .../cxf/systest/jaxrs/reactor/FluxReactorTest.java |  81 +++++++
 .../cxf/systest/jaxrs/reactor/FluxService.java     |  50 ++++
 .../cxf/systest/jaxrs/reactor/HelloWorldBean.java  |  44 ++++
 .../cxf/systest/jaxrs/reactor/MonoReactorTest.java |  96 ++++++++
 .../cxf/systest/jaxrs/reactor/MonoService.java     |  66 ++++++
 .../cxf/systest/jaxrs/reactor/ReactorServer.java   |  72 ++++++
 20 files changed, 1195 insertions(+), 23 deletions(-)

diff --git a/.gitignore b/.gitignore
index bc52a62..b625761 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ velocity.log
 bin/
 node_modules/
 derby.log
+.pmdruleset.xml
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index c1745d7..7487b99 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -234,6 +234,7 @@
         <cxf.downloadSources>true</cxf.downloadSources>
         <cxf.pmd.eclipse.ruleset.ext />
         <cxf.resources.base.path />
+        <cxf.reactor.version>3.1.0.RELEASE</cxf.reactor.version>
     </properties>
     <build>
         <resources>
@@ -829,6 +830,11 @@
                 <version>${cxf.rxjava2.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.projectreactor</groupId>
+                <artifactId>reactor-core</artifactId>
+                <version>${cxf.reactor.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty</artifactId>
                 <version>${cxf.netty.version}</version>
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/reactor/pom.xml
similarity index 66%
copy from rt/rs/extensions/rx/pom.xml
copy to rt/rs/extensions/reactor/pom.xml
index 03b1147..c5fd5e3 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/reactor/pom.xml
@@ -17,12 +17,13 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>cxf-rt-rs-extension-rx</artifactId>
+    <artifactId>cxf-rt-rs-extension-reactor</artifactId>
     <packaging>bundle</packaging>
-    <name>Apache CXF JAX-RS Extensions: RxJava</name>
-    <description>Apache CXF JAX-RS Extensions: RxJava</description>
+    <name>Apache CXF JAX-RS Extensions: Project Reactor</name>
+    <description>Apache CXF JAX-RS Extensions: Project Reactor</description>
     <url>http://cxf.apache.org</url>
     <parent>
         <groupId>org.apache.cxf</groupId>
@@ -40,24 +41,11 @@
             <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-client</artifactId>
             <version>${project.version}</version>
-        </dependency> 
-        <dependency>
-          <groupId>io.reactivex</groupId>
-          <artifactId>rxjava</artifactId>
-          <scope>provided</scope>
-          <optional>true</optional>
-        </dependency>
-        <dependency>
-          <groupId>io.reactivex.rxjava2</groupId>
-          <artifactId>rxjava</artifactId>
-          <scope>provided</scope>
-          <optional>true</optional>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <scope>provided</scope>
         </dependency>
-    
     </dependencies>
 </project>
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java
new file mode 100644
index 0000000..d68efcf
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface ReactorInvoker extends RxInvoker<Mono<?>> {
+    @Override
+    Mono<Response> get();
+
+    @Override
+    <T> Mono<T> get(Class<T> responseType);
+
+    <T> Flux<T> getFlux(Class<T> responseType);
+
+    @Override
+    <T> Mono<T> get(GenericType<T> responseType);
+
+    @Override
+    Mono<Response> put(Entity<?> entity);
+
+    @Override
+    <T> Mono<T> put(Entity<?> entity, Class<T> clazz);
+
+    <T> Flux<T> putFlux(Entity<?> entity, Class<T> responseType);
+
+    @Override
+    <T> Mono<T> put(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Mono<Response> post(Entity<?> entity);
+
+    @Override
+    <T> Mono<T> post(Entity<?> entity, Class<T> clazz);
+
+    <T> Flux<T> postFlux(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Mono<T> post(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Mono<Response> delete();
+
+    @Override
+    <T> Mono<T> delete(Class<T> responseType);
+
+    <T> Flux<T> deleteFlux(Class<T> responseType);
+
+    @Override
+    <T> Mono<T> delete(GenericType<T> responseType);
+
+    @Override
+    Mono<Response> head();
+
+    @Override
+    Mono<Response> options();
+
+    @Override
+    <T> Mono<T> options(Class<T> responseType);
+
+    <T> Flux<T> optionsFlux(Class<T> responseType);
+
+    @Override
+    <T> Mono<T> options(GenericType<T> responseType);
+
+    @Override
+    Mono<Response> trace();
+
+    @Override
+    <T> Mono<T> trace(Class<T> responseType);
+
+    <T> Flux<T> traceFlux(Class<T> responseType);
+
+    @Override
+    <T> Mono<T> trace(GenericType<T> responseType);
+
+    @Override
+    Mono<Response> method(String name);
+
+    @Override
+    <T> Mono<T> method(String name, Class<T> responseType);
+
+    @Override
+    <T> Mono<T> method(String name, GenericType<T> responseType);
+
+    @Override
+    Mono<Response> method(String name, Entity<?> entity);
+
+    @Override
+    <T> Mono<T> method(String name, Entity<?> entity, Class<T> responseType);
+
+    @Override
+    <T> Mono<T> method(String name, Entity<?> entity, GenericType<T> responseType);
+
+    <T> Flux<T> flux(String name, Class<T> responseType);
+
+    <T> Flux<T> flux(String name, Entity<?> entity, Class<T> responseType);
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
new file mode 100644
index 0000000..04f4628
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+import org.apache.cxf.jaxrs.client.WebClient;
+import static org.apache.cxf.jaxrs.reactor.client.ReactorUtils.TRACE;
+import static org.apache.cxf.jaxrs.reactor.client.ReactorUtils.toCompletableFuture;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ReactorInvokerImpl implements ReactorInvoker {
+    private final WebClient webClient;
+    private final ExecutorService executorService;
+
+    ReactorInvokerImpl(WebClient webClient, ExecutorService executorService) {
+        this.webClient = webClient;
+        this.executorService = executorService;
+    }
+
+    @Override
+    public Mono<Response> get() {
+        return method(HttpMethod.GET);
+    }
+
+    @Override
+    public <R> Mono<R> get(Class<R> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> getFlux(Class<T> responseType) {
+        return flux(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> get(GenericType<R> genericType) {
+        return method(HttpMethod.GET, genericType);
+    }
+
+    @Override
+    public Mono<Response> put(Entity<?> entity) {
+        return method(HttpMethod.PUT, entity);
+    }
+
+    @Override
+    public <R> Mono<R> put(Entity<?> entity, Class<R> responseType) {
+        return method(HttpMethod.PUT, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> putFlux(Entity<?> entity, Class<T> responseType) {
+        return flux(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> put(Entity<?> entity, GenericType<R> genericType) {
+        return method(HttpMethod.PUT, entity, genericType);
+    }
+
+    @Override
+    public Mono<Response> post(Entity<?> entity) {
+        return method(HttpMethod.POST, entity);
+    }
+
+    @Override
+    public <R> Mono<R> post(Entity<?> entity, Class<R> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> postFlux(Entity<?> entity, Class<T> responseType) {
+        return flux(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> post(Entity<?> entity, GenericType<R> genericType) {
+        return method(HttpMethod.POST, entity, genericType);
+    }
+
+    @Override
+    public Mono<Response> delete() {
+        return method(HttpMethod.DELETE);
+    }
+
+    @Override
+    public <R> Mono<R> delete(Class<R> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> deleteFlux(Class<T> responseType) {
+        return flux(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> delete(GenericType<R> genericType) {
+        return method(HttpMethod.DELETE, genericType);
+    }
+
+    @Override
+    public Mono<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Mono<Response> options() {
+        return method(HttpMethod.OPTIONS);
+    }
+
+    @Override
+    public <R> Mono<R> options(Class<R> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> optionsFlux(Class<T> responseType) {
+        return flux(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> options(GenericType<R> genericType) {
+        return method(HttpMethod.OPTIONS, genericType);
+    }
+
+    @Override
+    public Mono<Response> trace() {
+        return method(TRACE);
+    }
+
+    @Override
+    public <R> Mono<R> trace(Class<R> responseType) {
+        return method(TRACE, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> traceFlux(Class<T> responseType) {
+        return flux(TRACE, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> trace(GenericType<R> genericType) {
+        return method(TRACE, genericType);
+    }
+
+    @Override
+    public Mono<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public <R> Mono<R> method(String name, Class<R> responseType) {
+        return mono(webClient.async().method(name, responseType));
+    }
+
+    @Override
+    public <R> Mono<R> method(String name, GenericType<R> genericType) {
+        return mono(webClient.async().method(name, genericType));
+    }
+
+    @Override
+    public Mono<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <R> Mono<R> method(String name, Entity<?> entity, Class<R> responseType) {
+        return mono(webClient.async().method(name, entity, responseType));
+    }
+
+    @Override
+    public <T> Flux<T> flux(String name, Entity<?> entity, Class<T> responseType) {
+        Future<Response> futureResponse = webClient.async().method(name, entity);
+        return Flux.fromIterable(toIterable(futureResponse, responseType));
+    }
+
+    @Override
+    public <T> Flux<T> flux(String name, Class<T> responseType) {
+        Future<Response> futureResponse = webClient.async().method(name);
+        return Flux.fromIterable(toIterable(futureResponse, responseType));
+    }
+
+    @Override
+    public <R> Mono<R> method(String name, Entity<?> entity, GenericType<R> genericType) {
+        return mono(webClient.async().method(name, entity, genericType));
+    }
+
+    private <R> Mono<R> mono(Future<R> future) {
+        return Mono.fromFuture(toCompletableFuture(future, executorService));
+    }
+
+    private <R> Iterable<R> toIterable(Future<Response> futureResponse, Class<R> type) {
+        try {
+            Response response = futureResponse.get();
+            GenericType<List<R>> rGenericType = new GenericType<>(new WrappedType<R>(type));
+            return response.readEntity(rGenericType);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CompletionException(e);
+        }
+    }
+
+    private class WrappedType<R> implements ParameterizedType {
+        private final Class<R> rClass;
+
+        WrappedType(Class<R> rClass) {
+            this.rClass = rClass;
+        }
+
+        @Override
+        public Type[] getActualTypeArguments() {
+            return new Type[]{rClass };
+        }
+
+        @Override
+        public Type getRawType() {
+            return List.class;
+        }
+
+        @Override
+        public Type getOwnerType() {
+            return List.class;
+        }
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java
new file mode 100644
index 0000000..f1553f7
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.util.concurrent.ExecutorService;
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
+
+@Provider
+public class ReactorInvokerProvider implements RxInvokerProvider<ReactorInvoker> {
+    @Override
+    public boolean isProviderFor(Class<?> invokerType) {
+        return ReactorInvoker.class.isAssignableFrom(invokerType);
+    }
+
+    @Override
+    public ReactorInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+        return new ReactorInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java
new file mode 100644
index 0000000..7de3d61
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+
+final class ReactorUtils {
+    static final String TRACE = "TRACE";
+    private ReactorUtils() {
+
+    }
+    static <R> CompletableFuture<R> toCompletableFuture(Future<R> future, Executor executor) {
+        Supplier<R> supplier = () -> {
+            try {
+                return future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new CompletionException(e);
+            }
+        };
+        if (executor != null) {
+            return CompletableFuture.supplyAsync(supplier, executor);
+        } else {
+            return CompletableFuture.supplyAsync(supplier);
+        }
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java
new file mode 100644
index 0000000..18950bc
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.util.List;
+import javax.ws.rs.container.AsyncResponse;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+public abstract class AbstractSubscriber<T> implements Subscriber<T> {
+
+    private AsyncResponse ar;
+    private Subscription subscription;
+
+    protected AbstractSubscriber(AsyncResponse ar) {
+        this.ar = ar;
+    }
+    public void resume(T response) {
+        ar.resume(response);
+    }
+
+    public void resume(List<T> response) {
+        ar.resume(response);
+    }
+
+    public void resume(StreamingResponse<T> response) {
+        ar.resume(response);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        ar.resume(t);
+    }
+
+    @Override
+    public void onSubscribe(Subscription inSubscription) {
+        this.subscription = inSubscription;
+        requestAll();
+    }
+
+    @Override
+    public void onNext(T t) {
+        resume(t);
+    }
+
+    @Override
+    public void onComplete() {
+    }
+
+    protected AsyncResponse getAsyncResponse() {
+        return ar;
+    }
+
+    protected Subscription getSubscription() {
+        return subscription;
+    }
+
+    protected void requestNext() {
+        request(1);
+    }
+
+    protected void requestAll() {
+        request(Long.MAX_VALUE);
+    }
+
+    protected final void request(long elements) {
+        this.subscription.request(elements);
+    }
+}
\ No newline at end of file
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..a5aa780
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+        this(ar, 1000);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+        this(ar, pollTimeout, 0);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+        this(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, String prefix, String suffix, String separator, 
+            long pollTimeout, long asyncTimeout) {
+            super(ar, prefix, suffix, separator, pollTimeout, asyncTimeout);
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
new file mode 100644
index 0000000..6779e04
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.util.function.Consumer;
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ReactorInvoker extends JAXRSInvoker {
+    @Override
+    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+        if (result instanceof Flux) {
+            final Flux<?> flux = (Flux<?>) result;
+            final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+            flux.doOnNext(asyncResponse::resume)
+                    .doOnError(asyncResponse::resume)
+                    .doOnComplete(asyncResponse::onComplete)
+                    .subscribe();
+            return asyncResponse;
+        } else if (result instanceof Mono) {
+            // mono is only 0 or 1 element, so when something comes in need to complete the async
+            final Mono<?> flux = (Mono<?>) result;
+            final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+            flux.doOnNext((Consumer<Object>) o -> {
+                asyncResponse.resume(o);
+                asyncResponse.onComplete();
+            })
+            .doOnError((Consumer<Throwable>) throwable -> {
+                asyncResponse.resume(throwable);
+                asyncResponse.onComplete();
+            })
+                .subscribe();
+            return asyncResponse;
+        }
+        return null;
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..7b1efa1
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.reactivestreams.Subscription;
+
+public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T> {
+    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+    private String openTag;
+    private String closeTag;
+    private String separator;
+    private long pollTimeout;
+    private long asyncTimeout;
+    private volatile boolean completed;
+    private AtomicBoolean firstWriteDone = new AtomicBoolean();
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+        this(ar, openTag, closeTag, sep, 1000);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+                                    long pollTimeout) {
+        this(ar, openTag, closeTag, sep, pollTimeout, 0);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+                                    long pollTimeout, long asyncTimeout) {
+        super(ar);
+        this.openTag = openTag;
+        this.closeTag = closeTag;
+        this.separator = sep;
+        this.pollTimeout = pollTimeout;
+        this.asyncTimeout = 0;
+        if (asyncTimeout > 0) {
+            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            ar.setTimeoutHandler(new TimeoutHandlerImpl());
+        }
+    }
+    @Override
+    public void onSubscribe(Subscription subscription) {
+        if (asyncTimeout == 0) {
+            resumeAsyncResponse();
+        }
+        super.onSubscribe(subscription);
+    }
+    @Override
+    public void onComplete() {
+        completed = true;
+    }
+
+    @Override
+    public void onNext(T bean) {
+        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+            resumeAsyncResponse();
+        }
+        queue.add(bean);
+        super.requestNext();
+    }
+    protected void resumeAsyncResponse() {
+        resume(new StreamingResponseImpl());
+    }
+
+    private class StreamingResponseImpl implements StreamingResponse<T> {
+
+        @Override
+        public void writeTo(Writer<T> writer) throws IOException {
+            if (openTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+            }
+            while (!completed || !queue.isEmpty()) {
+                try {
+                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+                    if (bean != null) {
+                        if (firstWriteDone.getAndSet(true)) {
+                            writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+                        }
+                        writer.write(bean);
+                    }
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+            if (closeTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+            }
+
+        }
+
+    }
+    public class TimeoutHandlerImpl implements TimeoutHandler {
+
+        @Override
+        public void handleTimeout(AsyncResponse asyncResponse) {
+            if (queue.isEmpty()) {
+                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                resumeAsyncResponse();
+            }
+
+        }
+
+    }
+}
+
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
index 03b1147..4eba638 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx/pom.xml
@@ -58,6 +58,5 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-    
     </dependencies>
 </project>
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index 2336c4b..5a7b89a 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -38,6 +38,7 @@
         <module>extensions/providers</module>
         <module>extensions/search</module>
         <module>extensions/rx</module>
+        <module>extensions/reactor</module>
         <module>security</module>
         <module>sse</module>
     </modules>
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 7517079..d8ea6e6 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -60,12 +60,14 @@
         <dependency>
           <groupId>io.reactivex</groupId>
           <artifactId>rxjava</artifactId>
-          <version>${cxf.rxjava.version}</version>
         </dependency>
         <dependency>
           <groupId>io.reactivex.rxjava2</groupId>
           <artifactId>rxjava</artifactId>
-          <version>${cxf.rxjava2.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
         </dependency>
         <dependency>
             <groupId>javax.el</groupId>
@@ -314,6 +316,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-reactor</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-security-cors</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
new file mode 100644
index 0000000..f6ac2ac
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.client.ClientBuilder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FluxReactorTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = ReactorServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(ReactorServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
+        List<HelloWorldBean> collector = new ArrayList<>();
+        ClientBuilder.newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("application/json")
+                .rx(ReactorInvoker.class)
+                .get(HelloWorldBean.class)
+                .doOnNext(collector::add)
+                .subscribe();
+        Thread.sleep(500);
+        assertEquals(1, collector.size());
+        HelloWorldBean bean = collector.get(0);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+
+    @Test
+    public void testTextJsonImplicitListAsyncStream() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/flux/textJsonImplicitListAsyncStream";
+        List<HelloWorldBean> holder = new ArrayList<>();
+        ClientBuilder.newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("application/json")
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class)
+                .doOnNext(holder::add)
+                .subscribe();
+        Thread.sleep(500);
+        assertEquals(2, holder.size());
+        assertEquals("Hello", holder.get(0).getGreeting());
+        assertEquals("World", holder.get(0).getAudience());
+        assertEquals("Ciao", holder.get(1).getGreeting());
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
new file mode 100644
index 0000000..3584636
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import org.apache.cxf.jaxrs.reactor.server.JsonStreamingAsyncSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+@Path("/reactor/flux")
+public class FluxService {
+
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Flux<HelloWorldBean> getJson() {
+        return Flux.just(new HelloWorldBean());
+    }
+
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream")
+    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+        Flux.just("Hello", "Ciao")
+                .map(HelloWorldBean::new)
+                .subscribeOn(Schedulers.parallel())
+                .subscribe(new JsonStreamingAsyncSubscriber<>(ar));
+    }
+}
\ No newline at end of file
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java
new file mode 100644
index 0000000..3e9d3da
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+public class HelloWorldBean {
+    private String greeting;
+    private String audience = "World";
+    public HelloWorldBean() {
+        this("Hello");
+    }
+    public HelloWorldBean(String greeting) {
+        this.greeting = greeting;
+    }
+
+    public String getGreeting() {
+        return greeting;
+    }
+    public void setGreeting(String greeting) {
+        this.greeting = greeting;
+    }
+    public String getAudience() {
+        return audience;
+    }
+    public void setAudience(String audience) {
+        this.audience = audience;
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
new file mode 100644
index 0000000..2096969
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.client.ClientBuilder;
+import javax.xml.ws.Holder;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class MonoReactorTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = ReactorServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(ReactorServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/textJson";
+        List<HelloWorldBean> holder = new ArrayList<>();
+        ClientBuilder.newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("application/json")
+                .rx(ReactorInvoker.class)
+                .get(HelloWorldBean.class)
+                .doOnNext(holder::add)
+                .subscribe();
+        Thread.sleep(500);
+        assertEquals(1, holder.size());
+        HelloWorldBean bean = holder.get(0);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+
+    @Test
+    public void testTextJsonImplicitListAsyncStream() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/textJsonImplicitListAsyncStream";
+        Holder<HelloWorldBean> holder = new Holder<>();
+        ClientBuilder.newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("application/json")
+                .rx(ReactorInvoker.class)
+                .get(HelloWorldBean.class)
+                .doOnNext(helloWorldBean -> holder.value = helloWorldBean)
+                .subscribe();
+        Thread.sleep(500);
+        assertEquals("Hello", holder.value.getGreeting());
+        assertEquals("World", holder.value.getAudience());
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/textAsync";
+        Holder<String> holder = new Holder<>();
+        ClientBuilder.newClient()
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("text/plain")
+                .rx(ReactorInvoker.class)
+                .get(String.class)
+                .doOnNext(msg -> holder.value = msg)
+                .subscribe();
+
+        Thread.sleep(500);
+        assertEquals("Hello, world!", holder.value);
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
new file mode 100644
index 0000000..ce50dca
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import org.apache.cxf.jaxrs.reactor.server.AbstractSubscriber;
+import org.apache.cxf.jaxrs.reactor.server.JsonStreamingAsyncSubscriber;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+@Path("/reactor/mono")
+public class MonoService {
+
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Mono<HelloWorldBean> getJson() {
+        return Mono.just(new HelloWorldBean());
+    }
+
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream")
+    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+        Mono.just("Hello")
+                .map(HelloWorldBean::new)
+                .subscribeOn(Schedulers.elastic())
+                .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000, 0));
+    }
+
+    @GET
+    @Produces("text/plain")
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        Mono.just("Hello, ").map(s -> s + "world!")
+                .subscribe(new StringAsyncSubscriber(ar));
+
+    }
+
+    private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
+    }
+}
\ No newline at end of file
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
new file mode 100644
index 0000000..831d903
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.Collections;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.reactor.server.ReactorInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+public class ReactorServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(ReactorServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+
+    @Override
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setInvoker(new ReactorInvoker());
+        sf.setProvider(new JacksonJsonProvider());
+        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+        sf.setProvider(streamProvider);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(FluxService.class, MonoService.class);
+        sf.setResourceProvider(FluxService.class,
+                new SingletonResourceProvider(new FluxService(), true));
+        sf.setResourceProvider(MonoService.class,
+                new SingletonResourceProvider(new MonoService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        ReactorServer server = new ReactorServer();
+        System.out.println("Go to http://localhost:" + PORT + "/reactor/flux/textJsonImplicitListAsyncStream");
+        server.start();
+
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@cxf.apache.org" <co...@cxf.apache.org>'].