You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/10/24 15:37:06 UTC
[cxf] branch master updated: CXF-8358: RxJava2: Observable /
Flowable Returns Mixed Response on Errors and Hangs when Empty
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 18bae95 CXF-8358: RxJava2: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
new 9897b1c Merge pull request #711 from reta/CXF-8358
18bae95 is described below
commit 18bae95fa91c083d1229ac68b7c9dc0cd3ef13b4
Author: reta <dr...@gmail.com>
AuthorDate: Mon Oct 19 20:29:27 2020 -0400
CXF-8358: RxJava2: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
---
.../cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java | 2 +
.../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java | 17 +-
.../jaxrs/reactive/JAXRSRxJava2FlowableTest.java | 183 +++++++++++++++++++++
.../jaxrs/reactive/JAXRSRxJava2ObservableTest.java | 71 ++++++++
.../jaxrs/reactive/JAXRSRxJava2SingleTest.java | 113 +++++++++++++
.../jaxrs/reactive/RxJava2FlowableServer.java | 2 +
.../jaxrs/reactive/RxJava2FlowableService.java | 74 +++++++++
.../jaxrs/reactive/RxJava2ObservableServer.java | 5 +-
.../jaxrs/reactive/RxJava2ObservableService.java | 27 ++-
...ervableServer.java => RxJava2SingleServer.java} | 18 +-
...vableService.java => RxJava2SingleService.java} | 59 ++++---
11 files changed, 535 insertions(+), 36 deletions(-)
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
index e57bb39..c9a89a0 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.rx2.server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.jaxrs.reactivestreams.server.ResponseStatusOnlyExceptionMapper;
import org.apache.cxf.service.invoker.Invoker;
public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
@@ -31,6 +32,7 @@ public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
if (useStreamingSubscriber != null) {
invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
}
+ bean.setProvider(new ResponseStatusOnlyExceptionMapper());
return invoker;
}
}
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 35c0b08..4b87432 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -18,6 +18,8 @@
*/
package org.apache.cxf.jaxrs.rx2.server;
+import java.util.Collections;
+
import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
import org.apache.cxf.message.Message;
@@ -51,7 +53,7 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
- Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+ Disposable d = subscribe(f, asyncResponse);
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
@@ -61,11 +63,22 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
- Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+ Disposable d = subscribe(obs, asyncResponse);
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
return asyncResponse;
}
+ private <T> Disposable subscribe(final Flowable<T> f, final AsyncResponseImpl asyncResponse) {
+ return f
+ .switchIfEmpty(Flowable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+ .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+ }
+
+ private <T> Disposable subscribe(final Observable<T> obs, final AsyncResponseImpl asyncResponse) {
+ return obs
+ .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+ .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+ }
}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
index ca1e658..d8ab510 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
@@ -24,10 +24,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -152,4 +155,184 @@ public class JAXRSRxJava2FlowableTest extends AbstractBusClientServerTestBase {
subscriber.await(1, TimeUnit.SECONDS);
subscriber.assertError(NotFoundException.class);
}
+
+ @Test
+ public void testGetHelloWorldEmpty() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/flowable/empty";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+ .assertComplete();
+ }
+
+ @Test
+ public void testGetHelloWorldEmpty2() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx22/flowable/empty";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+ .assertComplete();
+ }
+
+ @Test
+ public void testFlowableImmediateErrors() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx22/flowable/immediate/errors";
+
+ final Flowable<HelloWorldBean> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber.assertError(InternalServerErrorException.class);
+ }
+
+ @Test
+ public void testFlowableErrorWithExceptionMapperReturnsNoExceptionPayload() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx22/flowable/mixed/error";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ // The response should not include the exception payload (injected by exception mapper)
+ // if some elements have been emitted before
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getStatus() == 409 && !r.readEntity(String.class).contains("stackTrace"))
+ .assertComplete();
+ }
+
+ @Test
+ public void testFlowableErrorWithExceptionMapperReturnsContentPayload() throws Exception {
+ GenericType<List<HelloWorldBean>> helloWorldBeanListType = new GenericType<List<HelloWorldBean>>() { };
+ String address = "http://localhost:" + PORT + "/rx22/flowable/mixed/error";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ // The response should include the emitted elements prior the error
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getStatus() == 409 && r.readEntity(helloWorldBeanListType).size() == 4)
+ .assertComplete();
+ }
+
+ @Test
+ public void testFlowableErrorsResponseWithMapper() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx22/flowable/mapper/errors";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getStatus() == 400)
+ .assertComplete();
+ }
+
+ @Test
+ public void testFlowableErrorWithWebException() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx22/flowable/web/errors";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ // The response should not include the exception payload (injected by exception mapper)
+ // if some elements have been emitted before
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getStatus() == 403 && !r.readEntity(String.class).contains("stackTrace"))
+ .assertComplete();
+ }
+
+ @Test
+ public void testFlowableImmediateErrorsWithExceptionMapper() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx22/flowable/immediate/mapper/errors";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+ .assertComplete();
+ }
}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
index ae36b69..b9123d7 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
@@ -22,8 +22,13 @@ package org.apache.cxf.systest.jaxrs.reactive;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.xml.ws.Holder;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -36,6 +41,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
+import io.reactivex.observers.TestObserver;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -88,6 +94,71 @@ public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase
String address = "http://localhost:" + PORT + "/rx2/observable/textJsonList";
doTestGetHelloWorldJsonList(address);
}
+
+ @Test
+ public void testGetHelloWorldEmpty() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/observable/empty";
+
+ final Observable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new ObservableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(ObservableRxInvoker.class)
+ .get();
+
+ final TestObserver<Response> subscriber = new TestObserver<>();
+ obs.subscribe(subscriber);
+
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+ .assertComplete();
+ }
+
+ @Test
+ public void testObservableImmediateErrors() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/observable/immediate/errors";
+
+ final Observable<HelloWorldBean> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new ObservableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(ObservableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ final TestObserver<HelloWorldBean> subscriber = new TestObserver<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber.assertError(InternalServerErrorException.class);
+ }
+
+ @Test
+ public void testObservableImmediateErrorsWithExceptionMapper() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/observable/immediate/mapper/errors";
+
+ final Observable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new ObservableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(ObservableRxInvoker.class)
+ .get();
+
+ final TestObserver<Response> subscriber = new TestObserver<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+ .assertComplete();
+ }
private void doTestGetHelloWorldJsonList(String address) throws Exception {
WebClient wc = WebClient.create(address,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java
new file mode 100644
index 0000000..91fd900
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.Flowable;
+import io.reactivex.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava2SingleTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJava2SingleServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly", launchServer(RxJava2SingleServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/single/textJson";
+
+ final Flowable<HelloWorldBean> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+ .assertComplete();
+ }
+
+ @Test
+ public void testGetString() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/single/textAsync";
+
+ final Flowable<String> obs = ClientBuilder
+ .newClient()
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.TEXT_PLAIN)
+ .rx(FlowableRxInvoker.class)
+ .get(String.class);
+
+ final TestSubscriber<String> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> "Hello, world!".equals(r))
+ .assertComplete();
+ }
+
+ @Test
+ public void testGetError() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/single/error";
+
+ final Flowable<String> obs = ClientBuilder
+ .newClient()
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get(String.class);
+
+ final TestSubscriber<String> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber.assertError(InternalServerErrorException.class);
+ }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
index 2b18c83..76a8b40 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
@@ -51,6 +51,8 @@ public class RxJava2FlowableServer extends AbstractBusTestServerBase {
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber);
sf.setProvider(new JacksonJsonProvider());
+ sf.setProvider(new IllegalArgumentExceptionMapper());
+ sf.setProvider(new IllegalStateExceptionMapper());
new ReactiveIOCustomizer().customize(sf);
sf.getOutInterceptors().add(new LoggingOutInterceptor());
sf.setResourceClasses(RxJava2FlowableService.class);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
index 86e68e5..c739328 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
@@ -24,11 +24,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import javax.ws.rs.ForbiddenException;
import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
@@ -123,6 +126,77 @@ public class RxJava2FlowableService {
}
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/empty")
+ public Flowable<HelloWorldBean> empty() {
+ return Flowable.empty();
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/mapper/errors")
+ public Flowable<HelloWorldBean> mapperErrors() {
+ return Flowable
+ .range(1, 3)
+ .flatMap(item -> {
+ if (item < 3) {
+ return Flowable.just(new HelloWorldBean("Person " + item));
+ } else {
+ return Flowable.error(new IllegalArgumentException("Oops"));
+ }
+ });
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/web/errors")
+ public Flowable<HelloWorldBean> webErrors() {
+ return Flowable
+ .range(1, 3)
+ .concatMap(item -> {
+ if (item < 3) {
+ return Flowable.just(new HelloWorldBean("Person " + item));
+ } else {
+ return Flowable.error(new ForbiddenException("Oops"));
+ }
+ });
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/immediate/errors")
+ public Flowable<HelloWorldBean> immediateErrors() {
+ return Flowable
+ .range(1, 2)
+ .flatMap(item -> Flowable.error(new RuntimeException("Oops")));
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/immediate/mapper/errors")
+ public Flowable<HelloWorldBean> immediateMapperErrors() {
+ return Flowable
+ .range(1, 2)
+ .flatMap(item -> Flowable.error(new IllegalStateException("Oops")));
+ }
+
+ @GET
+ @Path("/mixed/error")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Flowable<HelloWorldBean> errorAndData() {
+ return Flowable
+ .range(1, 5)
+ .flatMap(item -> {
+ if (item <= 4) {
+ return Flowable.just(new HelloWorldBean(" of Item: " + item));
+ } else {
+ return Flowable.error(new NotFoundException("Item not found"));
+ }
+ })
+ .onErrorResumeNext((Throwable e) -> Flowable.error(new IllegalStateException("Oops", e)));
+ }
+
private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
private StringBuilder sb = new StringBuilder();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
index a8849d1..294649b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
@@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory;
import org.apache.cxf.ext.logging.LoggingOutInterceptor;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
@@ -42,8 +42,9 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
// Make sure default JSONProvider is not loaded
bus.setProperty("skip.default.json.provider.registration", true);
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
- sf.setInvoker(new ReactiveIOInvoker());
sf.setProvider(new JacksonJsonProvider());
+ sf.setProvider(new IllegalStateExceptionMapper());
+ new ReactiveIOCustomizer().customize(sf);
sf.getOutInterceptors().add(new LoggingOutInterceptor());
sf.setResourceClasses(RxJava2ObservableService.class);
sf.setResourceProvider(RxJava2ObservableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
index f08147d..362c1ce 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -25,6 +25,7 @@ import java.util.List;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
import io.reactivex.Observable;
@@ -54,5 +55,29 @@ public class RxJava2ObservableService {
bean2.setGreeting("Ciao");
return Observable.just(Arrays.asList(bean1, bean2));
}
-
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/empty")
+ public Observable<HelloWorldBean> empty() {
+ return Observable.empty();
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/immediate/errors")
+ public Observable<HelloWorldBean> immediateErrors() {
+ return Observable
+ .range(1, 2)
+ .flatMap(item -> Observable.error(new RuntimeException("Oops")));
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/immediate/mapper/errors")
+ public Observable<HelloWorldBean> immediateMapperErrors() {
+ return Observable
+ .range(1, 2)
+ .flatMap(item -> Observable.error(new IllegalStateException("Oops")));
+ }
}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
similarity index 80%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
index a8849d1..d3d881d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
@@ -26,15 +26,15 @@ import org.apache.cxf.BusFactory;
import org.apache.cxf.ext.logging.LoggingOutInterceptor;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-public class RxJava2ObservableServer extends AbstractBusTestServerBase {
- public static final String PORT = allocatePort(RxJava2ObservableServer.class);
+public class RxJava2SingleServer extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(RxJava2SingleServer.class);
org.apache.cxf.endpoint.Server server;
- public RxJava2ObservableServer() {
+ public RxJava2SingleServer() {
}
protected void run() {
@@ -42,12 +42,12 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
// Make sure default JSONProvider is not loaded
bus.setProperty("skip.default.json.provider.registration", true);
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
- sf.setInvoker(new ReactiveIOInvoker());
sf.setProvider(new JacksonJsonProvider());
+ new ReactiveIOCustomizer().customize(sf);
sf.getOutInterceptors().add(new LoggingOutInterceptor());
- sf.setResourceClasses(RxJava2ObservableService.class);
- sf.setResourceProvider(RxJava2ObservableService.class,
- new SingletonResourceProvider(new RxJava2ObservableService(), true));
+ sf.setResourceClasses(RxJava2SingleService.class);
+ sf.setResourceProvider(RxJava2SingleService.class,
+ new SingletonResourceProvider(new RxJava2SingleService(), true));
sf.setAddress("http://localhost:" + PORT + "/");
server = sf.create();
}
@@ -60,7 +60,7 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
public static void main(String[] args) {
try {
- RxJava2ObservableServer s = new RxJava2ObservableServer();
+ RxJava2SingleServer s = new RxJava2SingleServer();
s.start();
} catch (Exception ex) {
ex.printStackTrace();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
similarity index 50%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
index f08147d..77a7302 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
@@ -19,40 +19,55 @@
package org.apache.cxf.systest.jaxrs.reactive;
-import java.util.Arrays;
-import java.util.List;
-
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
+
+import io.reactivex.Single;
-import io.reactivex.Observable;
+@Path("/rx2/single")
+public class RxJava2SingleService {
-@Path("/rx2/observable")
-public class RxJava2ObservableService {
+ @GET
+ @Produces("application/json")
+ @Path("textJson")
+ public Single<HelloWorldBean> getJson() {
+ return Single.just(new HelloWorldBean());
+ }
@GET
@Produces("text/plain")
- @Path("text")
- public Observable<String> getText() {
- return Observable.just("Hello, world!");
+ @Path("textAsync")
+ public void getTextAsync(@Suspended final AsyncResponse ar) {
+ final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+
+ Single
+ .just("Hello, ")
+ .map(s -> s + "world!")
+ .subscribe(
+ s -> {
+ subscriber.onNext(s);
+ subscriber.onComplete();
+ },
+ subscriber::onError);
+
}
@GET
@Produces("application/json")
- @Path("textJson")
- public Observable<HelloWorldBean> getJson() {
- return Observable.just(new HelloWorldBean());
+ @Path("error")
+ public Single<HelloWorldBean> getError() {
+ return Single.error(new RuntimeException("Oops"));
}
+
- @GET
- @Produces("application/json")
- @Path("textJsonList")
- public Observable<List<HelloWorldBean>> getJsonList() {
- HelloWorldBean bean1 = new HelloWorldBean();
- HelloWorldBean bean2 = new HelloWorldBean();
- bean2.setGreeting("Ciao");
- return Observable.just(Arrays.asList(bean1, bean2));
+ private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
+ StringAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
}
-
-}
+}
\ No newline at end of file