You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by GitBox <gi...@apache.org> on 2017/11/09 00:51:14 UTC

[GitHub] johnament closed pull request #331: [CXF-7535] Adding client & server support for Project Reactor

johnament closed pull request #331: [CXF-7535] Adding client & server support for Project Reactor
URL: https://github.com/apache/cxf/pull/331
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index bc52a625364..b625761a342 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ velocity.log
 bin/
 node_modules/
 derby.log
+.pmdruleset.xml
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index dbb6b7d97dd..2a25a4c9fc3 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -233,6 +233,7 @@
         <cxf.downloadSources>true</cxf.downloadSources>
         <cxf.pmd.eclipse.ruleset.ext />
         <cxf.resources.base.path />
+        <cxf.reactor.version>3.1.0.RELEASE</cxf.reactor.version>
     </properties>
     <build>
         <resources>
@@ -828,6 +829,11 @@
                 <version>${cxf.rxjava2.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.projectreactor</groupId>
+                <artifactId>reactor-core</artifactId>
+                <version>${cxf.reactor.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty</artifactId>
                 <version>${cxf.netty.version}</version>
diff --git a/rt/rs/extensions/reactor/pom.xml b/rt/rs/extensions/reactor/pom.xml
new file mode 100644
index 00000000000..c5fd5e3515f
--- /dev/null
+++ b/rt/rs/extensions/reactor/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>cxf-rt-rs-extension-reactor</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache CXF JAX-RS Extensions: Project Reactor</name>
+    <description>Apache CXF JAX-RS Extensions: Project Reactor</description>
+    <url>http://cxf.apache.org</url>
+    <parent>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-parent</artifactId>
+        <version>3.2.2-SNAPSHOT</version>
+        <relativePath>../../../../parent/pom.xml</relativePath>
+    </parent>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java
new file mode 100644
index 00000000000..d68efcf1b83
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface ReactorInvoker extends RxInvoker<Mono<?>> {
+    @Override
+    Mono<Response> get();
+
+    @Override
+    <T> Mono<T> get(Class<T> responseType);
+
+    <T> Flux<T> getFlux(Class<T> responseType);
+
+    @Override
+    <T> Mono<T> get(GenericType<T> responseType);
+
+    @Override
+    Mono<Response> put(Entity<?> entity);
+
+    @Override
+    <T> Mono<T> put(Entity<?> entity, Class<T> clazz);
+
+    <T> Flux<T> putFlux(Entity<?> entity, Class<T> responseType);
+
+    @Override
+    <T> Mono<T> put(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Mono<Response> post(Entity<?> entity);
+
+    @Override
+    <T> Mono<T> post(Entity<?> entity, Class<T> clazz);
+
+    <T> Flux<T> postFlux(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Mono<T> post(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Mono<Response> delete();
+
+    @Override
+    <T> Mono<T> delete(Class<T> responseType);
+
+    <T> Flux<T> deleteFlux(Class<T> responseType);
+
+    @Override
+    <T> Mono<T> delete(GenericType<T> responseType);
+
+    @Override
+    Mono<Response> head();
+
+    @Override
+    Mono<Response> options();
+
+    @Override
+    <T> Mono<T> options(Class<T> responseType);
+
+    <T> Flux<T> optionsFlux(Class<T> responseType);
+
+    @Override
+    <T> Mono<T> options(GenericType<T> responseType);
+
+    @Override
+    Mono<Response> trace();
+
+    @Override
+    <T> Mono<T> trace(Class<T> responseType);
+
+    <T> Flux<T> traceFlux(Class<T> responseType);
+
+    @Override
+    <T> Mono<T> trace(GenericType<T> responseType);
+
+    @Override
+    Mono<Response> method(String name);
+
+    @Override
+    <T> Mono<T> method(String name, Class<T> responseType);
+
+    @Override
+    <T> Mono<T> method(String name, GenericType<T> responseType);
+
+    @Override
+    Mono<Response> method(String name, Entity<?> entity);
+
+    @Override
+    <T> Mono<T> method(String name, Entity<?> entity, Class<T> responseType);
+
+    @Override
+    <T> Mono<T> method(String name, Entity<?> entity, GenericType<T> responseType);
+
+    <T> Flux<T> flux(String name, Class<T> responseType);
+
+    <T> Flux<T> flux(String name, Entity<?> entity, Class<T> responseType);
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
new file mode 100644
index 00000000000..04f46282f1d
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+import org.apache.cxf.jaxrs.client.WebClient;
+import static org.apache.cxf.jaxrs.reactor.client.ReactorUtils.TRACE;
+import static org.apache.cxf.jaxrs.reactor.client.ReactorUtils.toCompletableFuture;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ReactorInvokerImpl implements ReactorInvoker {
+    private final WebClient webClient;
+    private final ExecutorService executorService;
+
+    ReactorInvokerImpl(WebClient webClient, ExecutorService executorService) {
+        this.webClient = webClient;
+        this.executorService = executorService;
+    }
+
+    @Override
+    public Mono<Response> get() {
+        return method(HttpMethod.GET);
+    }
+
+    @Override
+    public <R> Mono<R> get(Class<R> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> getFlux(Class<T> responseType) {
+        return flux(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> get(GenericType<R> genericType) {
+        return method(HttpMethod.GET, genericType);
+    }
+
+    @Override
+    public Mono<Response> put(Entity<?> entity) {
+        return method(HttpMethod.PUT, entity);
+    }
+
+    @Override
+    public <R> Mono<R> put(Entity<?> entity, Class<R> responseType) {
+        return method(HttpMethod.PUT, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> putFlux(Entity<?> entity, Class<T> responseType) {
+        return flux(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> put(Entity<?> entity, GenericType<R> genericType) {
+        return method(HttpMethod.PUT, entity, genericType);
+    }
+
+    @Override
+    public Mono<Response> post(Entity<?> entity) {
+        return method(HttpMethod.POST, entity);
+    }
+
+    @Override
+    public <R> Mono<R> post(Entity<?> entity, Class<R> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> postFlux(Entity<?> entity, Class<T> responseType) {
+        return flux(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> post(Entity<?> entity, GenericType<R> genericType) {
+        return method(HttpMethod.POST, entity, genericType);
+    }
+
+    @Override
+    public Mono<Response> delete() {
+        return method(HttpMethod.DELETE);
+    }
+
+    @Override
+    public <R> Mono<R> delete(Class<R> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> deleteFlux(Class<T> responseType) {
+        return flux(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> delete(GenericType<R> genericType) {
+        return method(HttpMethod.DELETE, genericType);
+    }
+
+    @Override
+    public Mono<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Mono<Response> options() {
+        return method(HttpMethod.OPTIONS);
+    }
+
+    @Override
+    public <R> Mono<R> options(Class<R> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> optionsFlux(Class<T> responseType) {
+        return flux(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> options(GenericType<R> genericType) {
+        return method(HttpMethod.OPTIONS, genericType);
+    }
+
+    @Override
+    public Mono<Response> trace() {
+        return method(TRACE);
+    }
+
+    @Override
+    public <R> Mono<R> trace(Class<R> responseType) {
+        return method(TRACE, responseType);
+    }
+
+    @Override
+    public <T> Flux<T> traceFlux(Class<T> responseType) {
+        return flux(TRACE, responseType);
+    }
+
+    @Override
+    public <R> Mono<R> trace(GenericType<R> genericType) {
+        return method(TRACE, genericType);
+    }
+
+    @Override
+    public Mono<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public <R> Mono<R> method(String name, Class<R> responseType) {
+        return mono(webClient.async().method(name, responseType));
+    }
+
+    @Override
+    public <R> Mono<R> method(String name, GenericType<R> genericType) {
+        return mono(webClient.async().method(name, genericType));
+    }
+
+    @Override
+    public Mono<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <R> Mono<R> method(String name, Entity<?> entity, Class<R> responseType) {
+        return mono(webClient.async().method(name, entity, responseType));
+    }
+
+    @Override
+    public <T> Flux<T> flux(String name, Entity<?> entity, Class<T> responseType) {
+        Future<Response> futureResponse = webClient.async().method(name, entity);
+        return Flux.fromIterable(toIterable(futureResponse, responseType));
+    }
+
+    @Override
+    public <T> Flux<T> flux(String name, Class<T> responseType) {
+        Future<Response> futureResponse = webClient.async().method(name);
+        return Flux.fromIterable(toIterable(futureResponse, responseType));
+    }
+
+    @Override
+    public <R> Mono<R> method(String name, Entity<?> entity, GenericType<R> genericType) {
+        return mono(webClient.async().method(name, entity, genericType));
+    }
+
+    private <R> Mono<R> mono(Future<R> future) {
+        return Mono.fromFuture(toCompletableFuture(future, executorService));
+    }
+
+    private <R> Iterable<R> toIterable(Future<Response> futureResponse, Class<R> type) {
+        try {
+            Response response = futureResponse.get();
+            GenericType<List<R>> rGenericType = new GenericType<>(new WrappedType<R>(type));
+            return response.readEntity(rGenericType);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CompletionException(e);
+        }
+    }
+
+    private class WrappedType<R> implements ParameterizedType {
+        private final Class<R> rClass;
+
+        WrappedType(Class<R> rClass) {
+            this.rClass = rClass;
+        }
+
+        @Override
+        public Type[] getActualTypeArguments() {
+            return new Type[]{rClass };
+        }
+
+        @Override
+        public Type getRawType() {
+            return List.class;
+        }
+
+        @Override
+        public Type getOwnerType() {
+            return List.class;
+        }
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java
new file mode 100644
index 00000000000..f1553f70bc1
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.util.concurrent.ExecutorService;
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
+
+@Provider
+public class ReactorInvokerProvider implements RxInvokerProvider<ReactorInvoker> {
+    @Override
+    public boolean isProviderFor(Class<?> invokerType) {
+        return ReactorInvoker.class.isAssignableFrom(invokerType);
+    }
+
+    @Override
+    public ReactorInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+        return new ReactorInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java
new file mode 100644
index 00000000000..7de3d614638
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+
+final class ReactorUtils {
+    static final String TRACE = "TRACE";
+    private ReactorUtils() {
+
+    }
+    static <R> CompletableFuture<R> toCompletableFuture(Future<R> future, Executor executor) {
+        Supplier<R> supplier = () -> {
+            try {
+                return future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new CompletionException(e);
+            }
+        };
+        if (executor != null) {
+            return CompletableFuture.supplyAsync(supplier, executor);
+        } else {
+            return CompletableFuture.supplyAsync(supplier);
+        }
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java
new file mode 100644
index 00000000000..18950bc073b
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.util.List;
+import javax.ws.rs.container.AsyncResponse;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+public abstract class AbstractSubscriber<T> implements Subscriber<T> {
+
+    private AsyncResponse ar;
+    private Subscription subscription;
+
+    protected AbstractSubscriber(AsyncResponse ar) {
+        this.ar = ar;
+    }
+    public void resume(T response) {
+        ar.resume(response);
+    }
+
+    public void resume(List<T> response) {
+        ar.resume(response);
+    }
+
+    public void resume(StreamingResponse<T> response) {
+        ar.resume(response);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        ar.resume(t);
+    }
+
+    @Override
+    public void onSubscribe(Subscription inSubscription) {
+        this.subscription = inSubscription;
+        requestAll();
+    }
+
+    @Override
+    public void onNext(T t) {
+        resume(t);
+    }
+
+    @Override
+    public void onComplete() {
+    }
+
+    protected AsyncResponse getAsyncResponse() {
+        return ar;
+    }
+
+    protected Subscription getSubscription() {
+        return subscription;
+    }
+
+    protected void requestNext() {
+        request(1);
+    }
+
+    protected void requestAll() {
+        request(Long.MAX_VALUE);
+    }
+
+    protected final void request(long elements) {
+        this.subscription.request(elements);
+    }
+}
\ No newline at end of file
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 00000000000..a5aa7808e13
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+        this(ar, 1000);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+        this(ar, pollTimeout, 0);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+        this(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, String prefix, String suffix, String separator, 
+            long pollTimeout, long asyncTimeout) {
+            super(ar, prefix, suffix, separator, pollTimeout, asyncTimeout);
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
new file mode 100644
index 00000000000..6779e0413b8
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.util.function.Consumer;
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ReactorInvoker extends JAXRSInvoker {
+    @Override
+    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+        if (result instanceof Flux) {
+            final Flux<?> flux = (Flux<?>) result;
+            final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+            flux.doOnNext(asyncResponse::resume)
+                    .doOnError(asyncResponse::resume)
+                    .doOnComplete(asyncResponse::onComplete)
+                    .subscribe();
+            return asyncResponse;
+        } else if (result instanceof Mono) {
+            // mono is only 0 or 1 element, so when something comes in need to complete the async
+            final Mono<?> flux = (Mono<?>) result;
+            final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+            flux.doOnNext((Consumer<Object>) o -> {
+                asyncResponse.resume(o);
+                asyncResponse.onComplete();
+            })
+            .doOnError((Consumer<Throwable>) throwable -> {
+                asyncResponse.resume(throwable);
+                asyncResponse.onComplete();
+            })
+                .subscribe();
+            return asyncResponse;
+        }
+        return null;
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java
new file mode 100644
index 00000000000..7b1efa1080f
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.reactivestreams.Subscription;
+
+public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T> {
+    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+    private String openTag;
+    private String closeTag;
+    private String separator;
+    private long pollTimeout;
+    private long asyncTimeout;
+    private volatile boolean completed;
+    private AtomicBoolean firstWriteDone = new AtomicBoolean();
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+        this(ar, openTag, closeTag, sep, 1000);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+                                    long pollTimeout) {
+        this(ar, openTag, closeTag, sep, pollTimeout, 0);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+                                    long pollTimeout, long asyncTimeout) {
+        super(ar);
+        this.openTag = openTag;
+        this.closeTag = closeTag;
+        this.separator = sep;
+        this.pollTimeout = pollTimeout;
+        this.asyncTimeout = 0;
+        if (asyncTimeout > 0) {
+            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            ar.setTimeoutHandler(new TimeoutHandlerImpl());
+        }
+    }
+    @Override
+    public void onSubscribe(Subscription subscription) {
+        if (asyncTimeout == 0) {
+            resumeAsyncResponse();
+        }
+        super.onSubscribe(subscription);
+    }
+    @Override
+    public void onComplete() {
+        completed = true;
+    }
+
+    @Override
+    public void onNext(T bean) {
+        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+            resumeAsyncResponse();
+        }
+        queue.add(bean);
+        super.requestNext();
+    }
+    protected void resumeAsyncResponse() {
+        resume(new StreamingResponseImpl());
+    }
+
+    private class StreamingResponseImpl implements StreamingResponse<T> {
+
+        @Override
+        public void writeTo(Writer<T> writer) throws IOException {
+            if (openTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+            }
+            while (!completed || !queue.isEmpty()) {
+                try {
+                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+                    if (bean != null) {
+                        if (firstWriteDone.getAndSet(true)) {
+                            writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+                        }
+                        writer.write(bean);
+                    }
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+            if (closeTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+            }
+
+        }
+
+    }
+    public class TimeoutHandlerImpl implements TimeoutHandler {
+
+        @Override
+        public void handleTimeout(AsyncResponse asyncResponse) {
+            if (queue.isEmpty()) {
+                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                resumeAsyncResponse();
+            }
+
+        }
+
+    }
+}
+
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
index 03b11479129..4eba6381fb0 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx/pom.xml
@@ -58,6 +58,5 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-    
     </dependencies>
 </project>
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index 2336c4bd072..5a7b89a2a5f 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -38,6 +38,7 @@
         <module>extensions/providers</module>
         <module>extensions/search</module>
         <module>extensions/rx</module>
+        <module>extensions/reactor</module>
         <module>security</module>
         <module>sse</module>
     </modules>
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 75170791bda..d8ea6e66a32 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -60,12 +60,14 @@
         <dependency>
           <groupId>io.reactivex</groupId>
           <artifactId>rxjava</artifactId>
-          <version>${cxf.rxjava.version}</version>
         </dependency>
         <dependency>
           <groupId>io.reactivex.rxjava2</groupId>
           <artifactId>rxjava</artifactId>
-          <version>${cxf.rxjava2.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
         </dependency>
         <dependency>
             <groupId>javax.el</groupId>
@@ -314,6 +316,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-reactor</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-security-cors</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
new file mode 100644
index 00000000000..f6ac2accdec
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.client.ClientBuilder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FluxReactorTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = ReactorServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(ReactorServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
+        List<HelloWorldBean> collector = new ArrayList<>();
+        ClientBuilder.newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("application/json")
+                .rx(ReactorInvoker.class)
+                .get(HelloWorldBean.class)
+                .doOnNext(collector::add)
+                .subscribe();
+        Thread.sleep(500);
+        assertEquals(1, collector.size());
+        HelloWorldBean bean = collector.get(0);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+
+    @Test
+    public void testTextJsonImplicitListAsyncStream() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/flux/textJsonImplicitListAsyncStream";
+        List<HelloWorldBean> holder = new ArrayList<>();
+        ClientBuilder.newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("application/json")
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class)
+                .doOnNext(holder::add)
+                .subscribe();
+        Thread.sleep(500);
+        assertEquals(2, holder.size());
+        assertEquals("Hello", holder.get(0).getGreeting());
+        assertEquals("World", holder.get(0).getAudience());
+        assertEquals("Ciao", holder.get(1).getGreeting());
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
new file mode 100644
index 00000000000..35846366bd3
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import org.apache.cxf.jaxrs.reactor.server.JsonStreamingAsyncSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+@Path("/reactor/flux")
+public class FluxService {
+
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Flux<HelloWorldBean> getJson() {
+        return Flux.just(new HelloWorldBean());
+    }
+
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream")
+    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+        Flux.just("Hello", "Ciao")
+                .map(HelloWorldBean::new)
+                .subscribeOn(Schedulers.parallel())
+                .subscribe(new JsonStreamingAsyncSubscriber<>(ar));
+    }
+}
\ No newline at end of file
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java
new file mode 100644
index 00000000000..3e9d3dae4c8
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+public class HelloWorldBean {
+    private String greeting;
+    private String audience = "World";
+    public HelloWorldBean() {
+        this("Hello");
+    }
+    public HelloWorldBean(String greeting) {
+        this.greeting = greeting;
+    }
+
+    public String getGreeting() {
+        return greeting;
+    }
+    public void setGreeting(String greeting) {
+        this.greeting = greeting;
+    }
+    public String getAudience() {
+        return audience;
+    }
+    public void setAudience(String audience) {
+        this.audience = audience;
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
new file mode 100644
index 00000000000..2096969a630
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.client.ClientBuilder;
+import javax.xml.ws.Holder;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class MonoReactorTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = ReactorServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(ReactorServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/textJson";
+        List<HelloWorldBean> holder = new ArrayList<>();
+        ClientBuilder.newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("application/json")
+                .rx(ReactorInvoker.class)
+                .get(HelloWorldBean.class)
+                .doOnNext(holder::add)
+                .subscribe();
+        Thread.sleep(500);
+        assertEquals(1, holder.size());
+        HelloWorldBean bean = holder.get(0);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+
+    @Test
+    public void testTextJsonImplicitListAsyncStream() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/textJsonImplicitListAsyncStream";
+        Holder<HelloWorldBean> holder = new Holder<>();
+        ClientBuilder.newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("application/json")
+                .rx(ReactorInvoker.class)
+                .get(HelloWorldBean.class)
+                .doOnNext(helloWorldBean -> holder.value = helloWorldBean)
+                .subscribe();
+        Thread.sleep(500);
+        assertEquals("Hello", holder.value.getGreeting());
+        assertEquals("World", holder.value.getAudience());
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/textAsync";
+        Holder<String> holder = new Holder<>();
+        ClientBuilder.newClient()
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request("text/plain")
+                .rx(ReactorInvoker.class)
+                .get(String.class)
+                .doOnNext(msg -> holder.value = msg)
+                .subscribe();
+
+        Thread.sleep(500);
+        assertEquals("Hello, world!", holder.value);
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
new file mode 100644
index 00000000000..ce50dca6f8f
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import org.apache.cxf.jaxrs.reactor.server.AbstractSubscriber;
+import org.apache.cxf.jaxrs.reactor.server.JsonStreamingAsyncSubscriber;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+@Path("/reactor/mono")
+public class MonoService {
+
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Mono<HelloWorldBean> getJson() {
+        return Mono.just(new HelloWorldBean());
+    }
+
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream")
+    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+        Mono.just("Hello")
+                .map(HelloWorldBean::new)
+                .subscribeOn(Schedulers.elastic())
+                .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000, 0));
+    }
+
+    @GET
+    @Produces("text/plain")
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        Mono.just("Hello, ").map(s -> s + "world!")
+                .subscribe(new StringAsyncSubscriber(ar));
+
+    }
+
+    private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
+    }
+}
\ No newline at end of file
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
new file mode 100644
index 00000000000..831d9035233
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.Collections;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.reactor.server.ReactorInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+public class ReactorServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(ReactorServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+
+    @Override
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setInvoker(new ReactorInvoker());
+        sf.setProvider(new JacksonJsonProvider());
+        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+        sf.setProvider(streamProvider);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(FluxService.class, MonoService.class);
+        sf.setResourceProvider(FluxService.class,
+                new SingletonResourceProvider(new FluxService(), true));
+        sf.setResourceProvider(MonoService.class,
+                new SingletonResourceProvider(new MonoService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        ReactorServer server = new ReactorServer();
+        System.out.println("Go to http://localhost:" + PORT + "/reactor/flux/textJsonImplicitListAsyncStream");
+        server.start();
+
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services