You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by jo...@apache.org on 2017/11/09 00:51:11 UTC
[cxf] branch master updated: [CXF-7535] Adding client & server
support for Project Reactor (#331)
This is an automated email from the ASF dual-hosted git repository.
johndament pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new f634fb8 [CXF-7535] Adding client & server support for Project Reactor (#331)
f634fb8 is described below
commit f634fb80f0029e6986bc99e8b862778bca1145b0
Author: John Ament <jo...@gmail.com>
AuthorDate: Wed Nov 8 19:51:08 2017 -0500
[CXF-7535] Adding client & server support for Project Reactor (#331)
---
.gitignore | 1 +
parent/pom.xml | 6 +
rt/rs/extensions/{rx => reactor}/pom.xml | 28 +--
.../cxf/jaxrs/reactor/client/ReactorInvoker.java | 120 ++++++++++
.../jaxrs/reactor/client/ReactorInvokerImpl.java | 251 +++++++++++++++++++++
.../reactor/client/ReactorInvokerProvider.java | 38 ++++
.../cxf/jaxrs/reactor/client/ReactorUtils.java | 47 ++++
.../jaxrs/reactor/server/AbstractSubscriber.java | 86 +++++++
.../server/JsonStreamingAsyncSubscriber.java | 37 +++
.../cxf/jaxrs/reactor/server/ReactorInvoker.java | 56 +++++
.../reactor/server/StreamingAsyncSubscriber.java | 126 +++++++++++
rt/rs/extensions/rx/pom.xml | 1 -
rt/rs/pom.xml | 1 +
systests/jaxrs/pom.xml | 11 +-
.../cxf/systest/jaxrs/reactor/FluxReactorTest.java | 81 +++++++
.../cxf/systest/jaxrs/reactor/FluxService.java | 50 ++++
.../cxf/systest/jaxrs/reactor/HelloWorldBean.java | 44 ++++
.../cxf/systest/jaxrs/reactor/MonoReactorTest.java | 96 ++++++++
.../cxf/systest/jaxrs/reactor/MonoService.java | 66 ++++++
.../cxf/systest/jaxrs/reactor/ReactorServer.java | 72 ++++++
20 files changed, 1195 insertions(+), 23 deletions(-)
diff --git a/.gitignore b/.gitignore
index bc52a62..b625761 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ velocity.log
bin/
node_modules/
derby.log
+.pmdruleset.xml
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index c1745d7..7487b99 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -234,6 +234,7 @@
<cxf.downloadSources>true</cxf.downloadSources>
<cxf.pmd.eclipse.ruleset.ext />
<cxf.resources.base.path />
+ <cxf.reactor.version>3.1.0.RELEASE</cxf.reactor.version>
</properties>
<build>
<resources>
@@ -829,6 +830,11 @@
<version>${cxf.rxjava2.version}</version>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <version>${cxf.reactor.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>${cxf.netty.version}</version>
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/reactor/pom.xml
similarity index 66%
copy from rt/rs/extensions/rx/pom.xml
copy to rt/rs/extensions/reactor/pom.xml
index 03b1147..c5fd5e3 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/reactor/pom.xml
@@ -17,12 +17,13 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <artifactId>cxf-rt-rs-extension-rx</artifactId>
+ <artifactId>cxf-rt-rs-extension-reactor</artifactId>
<packaging>bundle</packaging>
- <name>Apache CXF JAX-RS Extensions: RxJava</name>
- <description>Apache CXF JAX-RS Extensions: RxJava</description>
+ <name>Apache CXF JAX-RS Extensions: Project Reactor</name>
+ <description>Apache CXF JAX-RS Extensions: Project Reactor</description>
<url>http://cxf.apache.org</url>
<parent>
<groupId>org.apache.cxf</groupId>
@@ -40,24 +41,11 @@
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-client</artifactId>
<version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>io.reactivex</groupId>
- <artifactId>rxjava</artifactId>
- <scope>provided</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>io.reactivex.rxjava2</groupId>
- <artifactId>rxjava</artifactId>
- <scope>provided</scope>
- <optional>true</optional>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <scope>provided</scope>
</dependency>
-
</dependencies>
</project>
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java
new file mode 100644
index 0000000..d68efcf
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvoker.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface ReactorInvoker extends RxInvoker<Mono<?>> {
+ @Override
+ Mono<Response> get();
+
+ @Override
+ <T> Mono<T> get(Class<T> responseType);
+
+ <T> Flux<T> getFlux(Class<T> responseType);
+
+ @Override
+ <T> Mono<T> get(GenericType<T> responseType);
+
+ @Override
+ Mono<Response> put(Entity<?> entity);
+
+ @Override
+ <T> Mono<T> put(Entity<?> entity, Class<T> clazz);
+
+ <T> Flux<T> putFlux(Entity<?> entity, Class<T> responseType);
+
+ @Override
+ <T> Mono<T> put(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Mono<Response> post(Entity<?> entity);
+
+ @Override
+ <T> Mono<T> post(Entity<?> entity, Class<T> clazz);
+
+ <T> Flux<T> postFlux(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Mono<T> post(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Mono<Response> delete();
+
+ @Override
+ <T> Mono<T> delete(Class<T> responseType);
+
+ <T> Flux<T> deleteFlux(Class<T> responseType);
+
+ @Override
+ <T> Mono<T> delete(GenericType<T> responseType);
+
+ @Override
+ Mono<Response> head();
+
+ @Override
+ Mono<Response> options();
+
+ @Override
+ <T> Mono<T> options(Class<T> responseType);
+
+ <T> Flux<T> optionsFlux(Class<T> responseType);
+
+ @Override
+ <T> Mono<T> options(GenericType<T> responseType);
+
+ @Override
+ Mono<Response> trace();
+
+ @Override
+ <T> Mono<T> trace(Class<T> responseType);
+
+ <T> Flux<T> traceFlux(Class<T> responseType);
+
+ @Override
+ <T> Mono<T> trace(GenericType<T> responseType);
+
+ @Override
+ Mono<Response> method(String name);
+
+ @Override
+ <T> Mono<T> method(String name, Class<T> responseType);
+
+ @Override
+ <T> Mono<T> method(String name, GenericType<T> responseType);
+
+ @Override
+ Mono<Response> method(String name, Entity<?> entity);
+
+ @Override
+ <T> Mono<T> method(String name, Entity<?> entity, Class<T> responseType);
+
+ @Override
+ <T> Mono<T> method(String name, Entity<?> entity, GenericType<T> responseType);
+
+ <T> Flux<T> flux(String name, Class<T> responseType);
+
+ <T> Flux<T> flux(String name, Entity<?> entity, Class<T> responseType);
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
new file mode 100644
index 0000000..04f4628
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+import org.apache.cxf.jaxrs.client.WebClient;
+import static org.apache.cxf.jaxrs.reactor.client.ReactorUtils.TRACE;
+import static org.apache.cxf.jaxrs.reactor.client.ReactorUtils.toCompletableFuture;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ReactorInvokerImpl implements ReactorInvoker {
+ private final WebClient webClient;
+ private final ExecutorService executorService;
+
+ ReactorInvokerImpl(WebClient webClient, ExecutorService executorService) {
+ this.webClient = webClient;
+ this.executorService = executorService;
+ }
+
+ @Override
+ public Mono<Response> get() {
+ return method(HttpMethod.GET);
+ }
+
+ @Override
+ public <R> Mono<R> get(Class<R> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <T> Flux<T> getFlux(Class<T> responseType) {
+ return flux(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <R> Mono<R> get(GenericType<R> genericType) {
+ return method(HttpMethod.GET, genericType);
+ }
+
+ @Override
+ public Mono<Response> put(Entity<?> entity) {
+ return method(HttpMethod.PUT, entity);
+ }
+
+ @Override
+ public <R> Mono<R> put(Entity<?> entity, Class<R> responseType) {
+ return method(HttpMethod.PUT, responseType);
+ }
+
+ @Override
+ public <T> Flux<T> putFlux(Entity<?> entity, Class<T> responseType) {
+ return flux(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public <R> Mono<R> put(Entity<?> entity, GenericType<R> genericType) {
+ return method(HttpMethod.PUT, entity, genericType);
+ }
+
+ @Override
+ public Mono<Response> post(Entity<?> entity) {
+ return method(HttpMethod.POST, entity);
+ }
+
+ @Override
+ public <R> Mono<R> post(Entity<?> entity, Class<R> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <T> Flux<T> postFlux(Entity<?> entity, Class<T> responseType) {
+ return flux(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <R> Mono<R> post(Entity<?> entity, GenericType<R> genericType) {
+ return method(HttpMethod.POST, entity, genericType);
+ }
+
+ @Override
+ public Mono<Response> delete() {
+ return method(HttpMethod.DELETE);
+ }
+
+ @Override
+ public <R> Mono<R> delete(Class<R> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <T> Flux<T> deleteFlux(Class<T> responseType) {
+ return flux(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <R> Mono<R> delete(GenericType<R> genericType) {
+ return method(HttpMethod.DELETE, genericType);
+ }
+
+ @Override
+ public Mono<Response> head() {
+ return method(HttpMethod.HEAD);
+ }
+
+ @Override
+ public Mono<Response> options() {
+ return method(HttpMethod.OPTIONS);
+ }
+
+ @Override
+ public <R> Mono<R> options(Class<R> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <T> Flux<T> optionsFlux(Class<T> responseType) {
+ return flux(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <R> Mono<R> options(GenericType<R> genericType) {
+ return method(HttpMethod.OPTIONS, genericType);
+ }
+
+ @Override
+ public Mono<Response> trace() {
+ return method(TRACE);
+ }
+
+ @Override
+ public <R> Mono<R> trace(Class<R> responseType) {
+ return method(TRACE, responseType);
+ }
+
+ @Override
+ public <T> Flux<T> traceFlux(Class<T> responseType) {
+ return flux(TRACE, responseType);
+ }
+
+ @Override
+ public <R> Mono<R> trace(GenericType<R> genericType) {
+ return method(TRACE, genericType);
+ }
+
+ @Override
+ public Mono<Response> method(String name) {
+ return method(name, Response.class);
+ }
+
+ @Override
+ public <R> Mono<R> method(String name, Class<R> responseType) {
+ return mono(webClient.async().method(name, responseType));
+ }
+
+ @Override
+ public <R> Mono<R> method(String name, GenericType<R> genericType) {
+ return mono(webClient.async().method(name, genericType));
+ }
+
+ @Override
+ public Mono<Response> method(String name, Entity<?> entity) {
+ return method(name, entity, Response.class);
+ }
+
+ @Override
+ public <R> Mono<R> method(String name, Entity<?> entity, Class<R> responseType) {
+ return mono(webClient.async().method(name, entity, responseType));
+ }
+
+ @Override
+ public <T> Flux<T> flux(String name, Entity<?> entity, Class<T> responseType) {
+ Future<Response> futureResponse = webClient.async().method(name, entity);
+ return Flux.fromIterable(toIterable(futureResponse, responseType));
+ }
+
+ @Override
+ public <T> Flux<T> flux(String name, Class<T> responseType) {
+ Future<Response> futureResponse = webClient.async().method(name);
+ return Flux.fromIterable(toIterable(futureResponse, responseType));
+ }
+
+ @Override
+ public <R> Mono<R> method(String name, Entity<?> entity, GenericType<R> genericType) {
+ return mono(webClient.async().method(name, entity, genericType));
+ }
+
+ private <R> Mono<R> mono(Future<R> future) {
+ return Mono.fromFuture(toCompletableFuture(future, executorService));
+ }
+
+ private <R> Iterable<R> toIterable(Future<Response> futureResponse, Class<R> type) {
+ try {
+ Response response = futureResponse.get();
+ GenericType<List<R>> rGenericType = new GenericType<>(new WrappedType<R>(type));
+ return response.readEntity(rGenericType);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CompletionException(e);
+ }
+ }
+
+ private class WrappedType<R> implements ParameterizedType {
+ private final Class<R> rClass;
+
+ WrappedType(Class<R> rClass) {
+ this.rClass = rClass;
+ }
+
+ @Override
+ public Type[] getActualTypeArguments() {
+ return new Type[]{rClass };
+ }
+
+ @Override
+ public Type getRawType() {
+ return List.class;
+ }
+
+ @Override
+ public Type getOwnerType() {
+ return List.class;
+ }
+ }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java
new file mode 100644
index 0000000..f1553f7
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.util.concurrent.ExecutorService;
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
+
+@Provider
+public class ReactorInvokerProvider implements RxInvokerProvider<ReactorInvoker> {
+ @Override
+ public boolean isProviderFor(Class<?> invokerType) {
+ return ReactorInvoker.class.isAssignableFrom(invokerType);
+ }
+
+ @Override
+ public ReactorInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+ return new ReactorInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+ }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java
new file mode 100644
index 0000000..7de3d61
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorUtils.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+
+final class ReactorUtils {
+ static final String TRACE = "TRACE";
+ private ReactorUtils() {
+
+ }
+ static <R> CompletableFuture<R> toCompletableFuture(Future<R> future, Executor executor) {
+ Supplier<R> supplier = () -> {
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CompletionException(e);
+ }
+ };
+ if (executor != null) {
+ return CompletableFuture.supplyAsync(supplier, executor);
+ } else {
+ return CompletableFuture.supplyAsync(supplier);
+ }
+ }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java
new file mode 100644
index 0000000..18950bc
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.util.List;
+import javax.ws.rs.container.AsyncResponse;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+public abstract class AbstractSubscriber<T> implements Subscriber<T> {
+
+ private AsyncResponse ar;
+ private Subscription subscription;
+
+ protected AbstractSubscriber(AsyncResponse ar) {
+ this.ar = ar;
+ }
+ public void resume(T response) {
+ ar.resume(response);
+ }
+
+ public void resume(List<T> response) {
+ ar.resume(response);
+ }
+
+ public void resume(StreamingResponse<T> response) {
+ ar.resume(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ ar.resume(t);
+ }
+
+ @Override
+ public void onSubscribe(Subscription inSubscription) {
+ this.subscription = inSubscription;
+ requestAll();
+ }
+
+ @Override
+ public void onNext(T t) {
+ resume(t);
+ }
+
+ @Override
+ public void onComplete() {
+ }
+
+ protected AsyncResponse getAsyncResponse() {
+ return ar;
+ }
+
+ protected Subscription getSubscription() {
+ return subscription;
+ }
+
+ protected void requestNext() {
+ request(1);
+ }
+
+ protected void requestAll() {
+ request(Long.MAX_VALUE);
+ }
+
+ protected final void request(long elements) {
+ this.subscription.request(elements);
+ }
+}
\ No newline at end of file
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..a5aa780
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+ this(ar, 1000);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+ this(ar, pollTimeout, 0);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+ this(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, String prefix, String suffix, String separator,
+ long pollTimeout, long asyncTimeout) {
+ super(ar, prefix, suffix, separator, pollTimeout, asyncTimeout);
+ }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
new file mode 100644
index 0000000..6779e04
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.util.function.Consumer;
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ReactorInvoker extends JAXRSInvoker {
+ @Override
+ protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+ if (result instanceof Flux) {
+ final Flux<?> flux = (Flux<?>) result;
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ flux.doOnNext(asyncResponse::resume)
+ .doOnError(asyncResponse::resume)
+ .doOnComplete(asyncResponse::onComplete)
+ .subscribe();
+ return asyncResponse;
+ } else if (result instanceof Mono) {
+ // mono is only 0 or 1 element, so when something comes in need to complete the async
+ final Mono<?> flux = (Mono<?>) result;
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ flux.doOnNext((Consumer<Object>) o -> {
+ asyncResponse.resume(o);
+ asyncResponse.onComplete();
+ })
+ .doOnError((Consumer<Throwable>) throwable -> {
+ asyncResponse.resume(throwable);
+ asyncResponse.onComplete();
+ })
+ .subscribe();
+ return asyncResponse;
+ }
+ return null;
+ }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..7b1efa1
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.reactivestreams.Subscription;
+
+public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T> {
+ private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+ private String openTag;
+ private String closeTag;
+ private String separator;
+ private long pollTimeout;
+ private long asyncTimeout;
+ private volatile boolean completed;
+ private AtomicBoolean firstWriteDone = new AtomicBoolean();
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+ this(ar, openTag, closeTag, sep, 1000);
+ }
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+ long pollTimeout) {
+ this(ar, openTag, closeTag, sep, pollTimeout, 0);
+ }
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+ long pollTimeout, long asyncTimeout) {
+ super(ar);
+ this.openTag = openTag;
+ this.closeTag = closeTag;
+ this.separator = sep;
+ this.pollTimeout = pollTimeout;
+ this.asyncTimeout = 0;
+ if (asyncTimeout > 0) {
+ ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+ ar.setTimeoutHandler(new TimeoutHandlerImpl());
+ }
+ }
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ if (asyncTimeout == 0) {
+ resumeAsyncResponse();
+ }
+ super.onSubscribe(subscription);
+ }
+ @Override
+ public void onComplete() {
+ completed = true;
+ }
+
+ @Override
+ public void onNext(T bean) {
+ if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+ resumeAsyncResponse();
+ }
+ queue.add(bean);
+ super.requestNext();
+ }
+ protected void resumeAsyncResponse() {
+ resume(new StreamingResponseImpl());
+ }
+
+ private class StreamingResponseImpl implements StreamingResponse<T> {
+
+ @Override
+ public void writeTo(Writer<T> writer) throws IOException {
+ if (openTag != null) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+ }
+ while (!completed || !queue.isEmpty()) {
+ try {
+ T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+ if (bean != null) {
+ if (firstWriteDone.getAndSet(true)) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+ }
+ writer.write(bean);
+ }
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+ if (closeTag != null) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+ }
+
+ }
+
+ }
+ public class TimeoutHandlerImpl implements TimeoutHandler {
+
+ @Override
+ public void handleTimeout(AsyncResponse asyncResponse) {
+ if (queue.isEmpty()) {
+ asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ resumeAsyncResponse();
+ }
+
+ }
+
+ }
+}
+
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
index 03b1147..4eba638 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx/pom.xml
@@ -58,6 +58,5 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
-
</dependencies>
</project>
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index 2336c4b..5a7b89a 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -38,6 +38,7 @@
<module>extensions/providers</module>
<module>extensions/search</module>
<module>extensions/rx</module>
+ <module>extensions/reactor</module>
<module>security</module>
<module>sse</module>
</modules>
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 7517079..d8ea6e6 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -60,12 +60,14 @@
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
- <version>${cxf.rxjava.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
- <version>${cxf.rxjava2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>javax.el</groupId>
@@ -314,6 +316,11 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-extension-reactor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-security-cors</artifactId>
<version>${project.version}</version>
<scope>test</scope>
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
new file mode 100644
index 0000000..f6ac2ac
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.client.ClientBuilder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FluxReactorTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = ReactorServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly", launchServer(ReactorServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
+ List<HelloWorldBean> collector = new ArrayList<>();
+ ClientBuilder.newClient()
+ .register(new JacksonJsonProvider())
+ .register(new ReactorInvokerProvider())
+ .target(address)
+ .request("application/json")
+ .rx(ReactorInvoker.class)
+ .get(HelloWorldBean.class)
+ .doOnNext(collector::add)
+ .subscribe();
+ Thread.sleep(500);
+ assertEquals(1, collector.size());
+ HelloWorldBean bean = collector.get(0);
+ assertEquals("Hello", bean.getGreeting());
+ assertEquals("World", bean.getAudience());
+ }
+
+ @Test
+ public void testTextJsonImplicitListAsyncStream() throws Exception {
+ String address = "http://localhost:" + PORT + "/reactor/flux/textJsonImplicitListAsyncStream";
+ List<HelloWorldBean> holder = new ArrayList<>();
+ ClientBuilder.newClient()
+ .register(new JacksonJsonProvider())
+ .register(new ReactorInvokerProvider())
+ .target(address)
+ .request("application/json")
+ .rx(ReactorInvoker.class)
+ .getFlux(HelloWorldBean.class)
+ .doOnNext(holder::add)
+ .subscribe();
+ Thread.sleep(500);
+ assertEquals(2, holder.size());
+ assertEquals("Hello", holder.get(0).getGreeting());
+ assertEquals("World", holder.get(0).getAudience());
+ assertEquals("Ciao", holder.get(1).getGreeting());
+ }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
new file mode 100644
index 0000000..3584636
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import org.apache.cxf.jaxrs.reactor.server.JsonStreamingAsyncSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+@Path("/reactor/flux")
+public class FluxService {
+
+ @GET
+ @Produces("application/json")
+ @Path("textJson")
+ public Flux<HelloWorldBean> getJson() {
+ return Flux.just(new HelloWorldBean());
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitListAsyncStream")
+ public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+ Flux.just("Hello", "Ciao")
+ .map(HelloWorldBean::new)
+ .subscribeOn(Schedulers.parallel())
+ .subscribe(new JsonStreamingAsyncSubscriber<>(ar));
+ }
+}
\ No newline at end of file
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java
new file mode 100644
index 0000000..3e9d3da
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/HelloWorldBean.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+public class HelloWorldBean {
+ private String greeting;
+ private String audience = "World";
+ public HelloWorldBean() {
+ this("Hello");
+ }
+ public HelloWorldBean(String greeting) {
+ this.greeting = greeting;
+ }
+
+ public String getGreeting() {
+ return greeting;
+ }
+ public void setGreeting(String greeting) {
+ this.greeting = greeting;
+ }
+ public String getAudience() {
+ return audience;
+ }
+ public void setAudience(String audience) {
+ this.audience = audience;
+ }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
new file mode 100644
index 0000000..2096969
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.client.ClientBuilder;
+import javax.xml.ws.Holder;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
+import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class MonoReactorTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = ReactorServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly", launchServer(ReactorServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/reactor/mono/textJson";
+ List<HelloWorldBean> holder = new ArrayList<>();
+ ClientBuilder.newClient()
+ .register(new JacksonJsonProvider())
+ .register(new ReactorInvokerProvider())
+ .target(address)
+ .request("application/json")
+ .rx(ReactorInvoker.class)
+ .get(HelloWorldBean.class)
+ .doOnNext(holder::add)
+ .subscribe();
+ Thread.sleep(500);
+ assertEquals(1, holder.size());
+ HelloWorldBean bean = holder.get(0);
+ assertEquals("Hello", bean.getGreeting());
+ assertEquals("World", bean.getAudience());
+ }
+
+ @Test
+ public void testTextJsonImplicitListAsyncStream() throws Exception {
+ String address = "http://localhost:" + PORT + "/reactor/mono/textJsonImplicitListAsyncStream";
+ Holder<HelloWorldBean> holder = new Holder<>();
+ ClientBuilder.newClient()
+ .register(new JacksonJsonProvider())
+ .register(new ReactorInvokerProvider())
+ .target(address)
+ .request("application/json")
+ .rx(ReactorInvoker.class)
+ .get(HelloWorldBean.class)
+ .doOnNext(helloWorldBean -> holder.value = helloWorldBean)
+ .subscribe();
+ Thread.sleep(500);
+ assertEquals("Hello", holder.value.getGreeting());
+ assertEquals("World", holder.value.getAudience());
+ }
+
+ @Test
+ public void testGetString() throws Exception {
+ String address = "http://localhost:" + PORT + "/reactor/mono/textAsync";
+ Holder<String> holder = new Holder<>();
+ ClientBuilder.newClient()
+ .register(new ReactorInvokerProvider())
+ .target(address)
+ .request("text/plain")
+ .rx(ReactorInvoker.class)
+ .get(String.class)
+ .doOnNext(msg -> holder.value = msg)
+ .subscribe();
+
+ Thread.sleep(500);
+ assertEquals("Hello, world!", holder.value);
+ }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
new file mode 100644
index 0000000..ce50dca
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import org.apache.cxf.jaxrs.reactor.server.AbstractSubscriber;
+import org.apache.cxf.jaxrs.reactor.server.JsonStreamingAsyncSubscriber;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+@Path("/reactor/mono")
+public class MonoService {
+
+ @GET
+ @Produces("application/json")
+ @Path("textJson")
+ public Mono<HelloWorldBean> getJson() {
+ return Mono.just(new HelloWorldBean());
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitListAsyncStream")
+ public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+ Mono.just("Hello")
+ .map(HelloWorldBean::new)
+ .subscribeOn(Schedulers.elastic())
+ .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000, 0));
+ }
+
+ @GET
+ @Produces("text/plain")
+ @Path("textAsync")
+ public void getTextAsync(@Suspended final AsyncResponse ar) {
+ Mono.just("Hello, ").map(s -> s + "world!")
+ .subscribe(new StringAsyncSubscriber(ar));
+
+ }
+
+ private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
+ StringAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
+ }
+}
\ No newline at end of file
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
new file mode 100644
index 0000000..831d903
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import java.util.Collections;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.reactor.server.ReactorInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+public class ReactorServer extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(ReactorServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+
+ @Override
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setInvoker(new ReactorInvoker());
+ sf.setProvider(new JacksonJsonProvider());
+ StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+ streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+ sf.setProvider(streamProvider);
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(FluxService.class, MonoService.class);
+ sf.setResourceProvider(FluxService.class,
+ new SingletonResourceProvider(new FluxService(), true));
+ sf.setResourceProvider(MonoService.class,
+ new SingletonResourceProvider(new MonoService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ ReactorServer server = new ReactorServer();
+ System.out.println("Go to http://localhost:" + PORT + "/reactor/flux/textJsonImplicitListAsyncStream");
+ server.start();
+
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@cxf.apache.org" <co...@cxf.apache.org>'].