You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/10/24 15:37:06 UTC

[cxf] branch master updated: CXF-8358: RxJava2: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 18bae95  CXF-8358: RxJava2: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
     new 9897b1c  Merge pull request #711 from reta/CXF-8358
18bae95 is described below

commit 18bae95fa91c083d1229ac68b7c9dc0cd3ef13b4
Author: reta <dr...@gmail.com>
AuthorDate: Mon Oct 19 20:29:27 2020 -0400

    CXF-8358: RxJava2: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
---
 .../cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java |   2 +
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java    |  17 +-
 .../jaxrs/reactive/JAXRSRxJava2FlowableTest.java   | 183 +++++++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava2ObservableTest.java |  71 ++++++++
 .../jaxrs/reactive/JAXRSRxJava2SingleTest.java     | 113 +++++++++++++
 .../jaxrs/reactive/RxJava2FlowableServer.java      |   2 +
 .../jaxrs/reactive/RxJava2FlowableService.java     |  74 +++++++++
 .../jaxrs/reactive/RxJava2ObservableServer.java    |   5 +-
 .../jaxrs/reactive/RxJava2ObservableService.java   |  27 ++-
 ...ervableServer.java => RxJava2SingleServer.java} |  18 +-
 ...vableService.java => RxJava2SingleService.java} |  59 ++++---
 11 files changed, 535 insertions(+), 36 deletions(-)

diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
index e57bb39..c9a89a0 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.rx2.server;
 
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.jaxrs.reactivestreams.server.ResponseStatusOnlyExceptionMapper;
 import org.apache.cxf.service.invoker.Invoker;
 
 public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
@@ -31,6 +32,7 @@ public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
         if (useStreamingSubscriber != null) {
             invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
         }
+        bean.setProvider(new ResponseStatusOnlyExceptionMapper());
         return invoker;
     }
 }
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 35c0b08..4b87432 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cxf.jaxrs.rx2.server;
 
+import java.util.Collections;
+
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
@@ -51,7 +53,7 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
-            Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+            Disposable d = subscribe(f, asyncResponse);
             if (d == null) {
                 throw new IllegalStateException("Subscribe did not return a Disposable");
             }
@@ -61,11 +63,22 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     
     protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-        Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+        Disposable d = subscribe(obs, asyncResponse);
         if (d == null) {
             throw new IllegalStateException("Subscribe did not return a Disposable");
         }
         return asyncResponse;
     }
 
+    private <T> Disposable subscribe(final Flowable<T> f, final AsyncResponseImpl asyncResponse) {
+        return f
+            .switchIfEmpty(Flowable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
+
+    private <T> Disposable subscribe(final Observable<T> obs, final AsyncResponseImpl asyncResponse) {
+        return obs
+            .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
index ca1e658..d8ab510 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
@@ -24,10 +24,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Invocation;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
@@ -152,4 +155,184 @@ public class JAXRSRxJava2FlowableTest extends AbstractBusClientServerTestBase {
         subscriber.await(1, TimeUnit.SECONDS);
         subscriber.assertError(NotFoundException.class);
     }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/flowable/empty";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetHelloWorldEmpty2() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/empty";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/immediate/errors";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+
+    @Test
+    public void testFlowableErrorWithExceptionMapperReturnsNoExceptionPayload() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/mixed/error";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should not include the exception payload (injected by exception mapper)
+        // if some elements have been emitted before
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && !r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableErrorWithExceptionMapperReturnsContentPayload() throws Exception {
+        GenericType<List<HelloWorldBean>> helloWorldBeanListType = new GenericType<List<HelloWorldBean>>() {  };
+        String address = "http://localhost:" + PORT + "/rx22/flowable/mixed/error";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should include the emitted elements prior the error
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(helloWorldBeanListType).size() == 4)
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableErrorsResponseWithMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/mapper/errors";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 400)
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableErrorWithWebException() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/web/errors";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should not include the exception payload (injected by exception mapper)
+        // if some elements have been emitted before
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 403 && !r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/immediate/mapper/errors";
+
+        final Flowable<Response> obs = ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new FlowableRxInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(FlowableRxInvoker.class)
+                .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
index ae36b69..b9123d7 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
@@ -22,8 +22,13 @@ package org.apache.cxf.systest.jaxrs.reactive;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.xml.ws.Holder;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -36,6 +41,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import io.reactivex.Observable;
 import io.reactivex.disposables.Disposable;
+import io.reactivex.observers.TestObserver;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -88,6 +94,71 @@ public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase
         String address = "http://localhost:" + PORT + "/rx2/observable/textJsonList";
         doTestGetHelloWorldJsonList(address);
     }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/empty";
+
+        final Observable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ObservableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ObservableRxInvoker.class)
+            .get();
+
+        final TestObserver<Response> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+
+    @Test
+    public void testObservableImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/immediate/errors";
+
+        final Observable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ObservableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ObservableRxInvoker.class)
+            .get(HelloWorldBean.class);
+
+        final TestObserver<HelloWorldBean> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+
+    @Test
+    public void testObservableImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/immediate/mapper/errors";
+
+        final Observable<Response> obs = ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ObservableRxInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ObservableRxInvoker.class)
+                .get();
+
+        final TestObserver<Response> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
 
     private void doTestGetHelloWorldJsonList(String address) throws Exception {
         WebClient wc = WebClient.create(address,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java
new file mode 100644
index 0000000..91fd900
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.Flowable;
+import io.reactivex.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava2SingleTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava2SingleServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(RxJava2SingleServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/single/textJson";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/single/textAsync";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.TEXT_PLAIN)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "Hello, world!".equals(r))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/single/error";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
index 2b18c83..76a8b40 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
@@ -51,6 +51,8 @@ public class RxJava2FlowableServer extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber);
         sf.setProvider(new JacksonJsonProvider());
+        sf.setProvider(new IllegalArgumentExceptionMapper());
+        sf.setProvider(new IllegalStateExceptionMapper());
         new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava2FlowableService.class);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
index 86e68e5..c739328 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
@@ -24,11 +24,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import javax.ws.rs.ForbiddenException;
 import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
 import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
@@ -123,6 +126,77 @@ public class RxJava2FlowableService {
 
     }
     
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Flowable<HelloWorldBean> empty() { 
+        return Flowable.empty(); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/mapper/errors")
+    public Flowable<HelloWorldBean> mapperErrors() { 
+        return Flowable 
+            .range(1, 3) 
+            .flatMap(item -> { 
+                if (item < 3) { 
+                    return Flowable.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Flowable.error(new IllegalArgumentException("Oops")); 
+                } 
+            }); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/web/errors")
+    public Flowable<HelloWorldBean> webErrors() { 
+        return Flowable 
+            .range(1, 3) 
+            .concatMap(item -> { 
+                if (item < 3) { 
+                    return Flowable.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Flowable.error(new ForbiddenException("Oops")); 
+                } 
+            }); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Flowable<HelloWorldBean> immediateErrors() { 
+        return Flowable 
+            .range(1, 2) 
+            .flatMap(item -> Flowable.error(new RuntimeException("Oops"))); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Flowable<HelloWorldBean> immediateMapperErrors() { 
+        return Flowable 
+            .range(1, 2) 
+            .flatMap(item -> Flowable.error(new IllegalStateException("Oops"))); 
+    }
+
+    @GET
+    @Path("/mixed/error")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Flowable<HelloWorldBean> errorAndData() {
+        return Flowable
+            .range(1, 5)
+            .flatMap(item -> {
+                if (item <= 4) {
+                    return Flowable.just(new HelloWorldBean(" of Item: " + item));
+                } else {
+                    return Flowable.error(new NotFoundException("Item not found"));
+                }
+            })
+            .onErrorResumeNext((Throwable e) -> Flowable.error(new IllegalStateException("Oops", e)));
+    }
+    
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
 
         private StringBuilder sb = new StringBuilder();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
index a8849d1..294649b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
@@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -42,8 +42,9 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
+        sf.setProvider(new IllegalStateExceptionMapper());
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava2ObservableService.class);
         sf.setResourceProvider(RxJava2ObservableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
index f08147d..362c1ce 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -25,6 +25,7 @@ import java.util.List;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
 
 import io.reactivex.Observable;
 
@@ -54,5 +55,29 @@ public class RxJava2ObservableService {
         bean2.setGreeting("Ciao");
         return Observable.just(Arrays.asList(bean1, bean2));
     }
-  
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Observable<HelloWorldBean> empty() { 
+        return Observable.empty(); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Observable<HelloWorldBean> immediateErrors() { 
+        return Observable 
+            .range(1, 2) 
+            .flatMap(item -> Observable.error(new RuntimeException("Oops"))); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Observable<HelloWorldBean> immediateMapperErrors() { 
+        return Observable 
+            .range(1, 2) 
+            .flatMap(item -> Observable.error(new IllegalStateException("Oops"))); 
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
similarity index 80%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
index a8849d1..d3d881d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
@@ -26,15 +26,15 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
-public class RxJava2ObservableServer extends AbstractBusTestServerBase {
-    public static final String PORT = allocatePort(RxJava2ObservableServer.class);
+public class RxJava2SingleServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava2SingleServer.class);
 
     org.apache.cxf.endpoint.Server server;
-    public RxJava2ObservableServer() {
+    public RxJava2SingleServer() {
     }
 
     protected void run() {
@@ -42,12 +42,12 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
-        sf.setResourceClasses(RxJava2ObservableService.class);
-        sf.setResourceProvider(RxJava2ObservableService.class,
-                               new SingletonResourceProvider(new RxJava2ObservableService(), true));
+        sf.setResourceClasses(RxJava2SingleService.class);
+        sf.setResourceProvider(RxJava2SingleService.class,
+                               new SingletonResourceProvider(new RxJava2SingleService(), true));
         sf.setAddress("http://localhost:" + PORT + "/");
         server = sf.create();
     }
@@ -60,7 +60,7 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
 
     public static void main(String[] args) {
         try {
-            RxJava2ObservableServer s = new RxJava2ObservableServer();
+            RxJava2SingleServer s = new RxJava2SingleServer();
             s.start();
         } catch (Exception ex) {
             ex.printStackTrace();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
similarity index 50%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
index f08147d..77a7302 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
@@ -19,40 +19,55 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
-import java.util.Arrays;
-import java.util.List;
-
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
+
+import io.reactivex.Single;
 
-import io.reactivex.Observable;
+@Path("/rx2/single")
+public class RxJava2SingleService {
 
-@Path("/rx2/observable")
-public class RxJava2ObservableService {
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Single<HelloWorldBean> getJson() {
+        return Single.just(new HelloWorldBean());
+    }
 
     @GET
     @Produces("text/plain")
-    @Path("text")
-    public Observable<String> getText() {
-        return Observable.just("Hello, world!");
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+        
+        Single
+            .just("Hello, ")
+            .map(s -> s + "world!")
+            .subscribe(
+                s -> {
+                    subscriber.onNext(s);
+                    subscriber.onComplete();
+                },
+                subscriber::onError);
+
     }
     
     @GET
     @Produces("application/json")
-    @Path("textJson")
-    public Observable<HelloWorldBean> getJson() {
-        return Observable.just(new HelloWorldBean());
+    @Path("error")
+    public Single<HelloWorldBean> getError() {
+        return Single.error(new RuntimeException("Oops"));
     }
+
     
-    @GET
-    @Produces("application/json")
-    @Path("textJsonList")
-    public Observable<List<HelloWorldBean>> getJsonList() {
-        HelloWorldBean bean1 = new HelloWorldBean();
-        HelloWorldBean bean2 = new HelloWorldBean();
-        bean2.setGreeting("Ciao");
-        return Observable.just(Arrays.asList(bean1, bean2));
+    private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
     }
-  
-}
+}
\ No newline at end of file