You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/10/24 17:59:48 UTC

[cxf] branch 3.2.x-fixes updated (84064ac -> 87f7d09)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a change to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git.


    from 84064ac  Recording .gitmergeinfo Changes
     new 4327ed9  CXF-8358: RxJava2: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
     new 87f7d09  Recording .gitmergeinfo Changes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitmergeinfo                                      |   2 +
 .../cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java |   2 +
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java    |  17 +-
 .../jaxrs/reactive/JAXRSRxJava2FlowableTest.java   | 185 +++++++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava2ObservableTest.java |  71 ++++++++
 ...SingleTest.java => JAXRSRxJava2SingleTest.java} |  20 +--
 .../jaxrs/reactive/RxJava2FlowableServer.java      |   2 +
 .../jaxrs/reactive/RxJava2FlowableService.java     |  74 +++++++++
 .../jaxrs/reactive/RxJava2ObservableServer.java    |   5 +-
 .../jaxrs/reactive/RxJava2ObservableService.java   |  27 ++-
 ...3SingleServer.java => RxJava2SingleServer.java} |  16 +-
 ...ingleService.java => RxJava2SingleService.java} |   7 +-
 12 files changed, 402 insertions(+), 26 deletions(-)
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/{JAXRSRxJava3SingleTest.java => JAXRSRxJava2SingleTest.java} (85%)
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/{RxJava3SingleServer.java => RxJava2SingleServer.java} (83%)
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/{RxJava3SingleService.java => RxJava2SingleService.java} (95%)


[cxf] 02/02: Recording .gitmergeinfo Changes

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 87f7d091be402b98f5dda86c083068c5b598d5a3
Author: reta <dr...@gmail.com>
AuthorDate: Sat Oct 24 13:59:29 2020 -0400

    Recording .gitmergeinfo Changes
---
 .gitmergeinfo | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/.gitmergeinfo b/.gitmergeinfo
index 4c0e240..f12473a 100644
--- a/.gitmergeinfo
+++ b/.gitmergeinfo
@@ -592,6 +592,7 @@ B 9987ba3645d65d119b45913e83bf34deab7909f6
 B 9991fa7b918e8a82a172f988684258ca03874737
 B 99ce8c28288dcddcb5d52b67dea42db64d46ae73
 B 9a04d23614049158833d0c1af7aee7159b84b9c0
+B 9a27e102e9ac6a1454c347c608796622c3d4eb1b
 B 9ad086a42f9758cb9bfbb8e66ec99d4f339c481f
 B 9ad3799645e1faeb82c4a0c361a1febd5dd0246a
 B 9adbef420bb4a4d8505c65c0b15b05dd8bb4bc09
@@ -1086,6 +1087,7 @@ M 86d7c657499b73bdbbd6ba9f1579e76ca0715357
 M 886a055d49d844e445721e4752c7a6360cbe8b9a
 M 890a0186ff92bcf08a82ecbf4b48c4a37997462c
 M 8911a2394b2f9ef58d9132a22927666bd77adf0d
+M 8b2a6d462035e5eb83fb0ccd7e1812a6341fffa0
 M 8bb64836e76077572349a19829adcbbe9c58689d
 M 8df4b33f4dad2934b05cb1c0e706276f76bc7044
 M 8e78fae173033a3118e0eb278816fc3088aecbea


[cxf] 01/02: CXF-8358: RxJava2: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 4327ed983f93e9575592c3d38bf634470a9346d8
Author: reta <dr...@gmail.com>
AuthorDate: Mon Oct 19 20:29:27 2020 -0400

    CXF-8358: RxJava2: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
    
    (cherry picked from commit 18bae95fa91c083d1229ac68b7c9dc0cd3ef13b4)
    (cherry picked from commit 8b2a6d462035e5eb83fb0ccd7e1812a6341fffa0)
    
    # Conflicts:
    #	systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
    #	systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
---
 .../cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java |   2 +
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java    |  17 +-
 .../jaxrs/reactive/JAXRSRxJava2FlowableTest.java   | 185 +++++++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava2ObservableTest.java |  71 ++++++++
 .../jaxrs/reactive/JAXRSRxJava2SingleTest.java     | 113 +++++++++++++
 .../jaxrs/reactive/RxJava2FlowableServer.java      |   2 +
 .../jaxrs/reactive/RxJava2FlowableService.java     |  74 +++++++++
 .../jaxrs/reactive/RxJava2ObservableServer.java    |   5 +-
 .../jaxrs/reactive/RxJava2ObservableService.java   |  27 ++-
 ...ervableServer.java => RxJava2SingleServer.java} |  18 +-
 ...vableService.java => RxJava2SingleService.java} |  59 ++++---
 11 files changed, 537 insertions(+), 36 deletions(-)

diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
index e57bb39..c9a89a0 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.rx2.server;
 
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.jaxrs.reactivestreams.server.ResponseStatusOnlyExceptionMapper;
 import org.apache.cxf.service.invoker.Invoker;
 
 public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
@@ -31,6 +32,7 @@ public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
         if (useStreamingSubscriber != null) {
             invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
         }
+        bean.setProvider(new ResponseStatusOnlyExceptionMapper());
         return invoker;
     }
 }
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 35c0b08..4b87432 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cxf.jaxrs.rx2.server;
 
+import java.util.Collections;
+
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
@@ -51,7 +53,7 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
-            Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+            Disposable d = subscribe(f, asyncResponse);
             if (d == null) {
                 throw new IllegalStateException("Subscribe did not return a Disposable");
             }
@@ -61,11 +63,22 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     
     protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-        Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+        Disposable d = subscribe(obs, asyncResponse);
         if (d == null) {
             throw new IllegalStateException("Subscribe did not return a Disposable");
         }
         return asyncResponse;
     }
 
+    private <T> Disposable subscribe(final Flowable<T> f, final AsyncResponseImpl asyncResponse) {
+        return f
+            .switchIfEmpty(Flowable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
+
+    private <T> Disposable subscribe(final Observable<T> obs, final AsyncResponseImpl asyncResponse) {
+        return obs
+            .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
index af3cf2b..2b54c8f 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
@@ -23,11 +23,15 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Invocation;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.xml.ws.Holder;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -40,6 +44,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import io.reactivex.Flowable;
 import io.reactivex.disposables.Disposable;
+import io.reactivex.subscribers.TestSubscriber;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -168,4 +173,184 @@ public class JAXRSRxJava2FlowableTest extends AbstractBusClientServerTestBase {
     private void assertDuplicateResponse(String s) {
         assertEquals("Hello, world!Hello, world!", s);
     }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/flowable/empty";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetHelloWorldEmpty2() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/empty";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/immediate/errors";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+
+    @Test
+    public void testFlowableErrorWithExceptionMapperReturnsNoExceptionPayload() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/mixed/error";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should not include the exception payload (injected by exception mapper)
+        // if some elements have been emitted before
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && !r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableErrorWithExceptionMapperReturnsContentPayload() throws Exception {
+        GenericType<List<HelloWorldBean>> helloWorldBeanListType = new GenericType<List<HelloWorldBean>>() {  };
+        String address = "http://localhost:" + PORT + "/rx22/flowable/mixed/error";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should include the emitted elements prior the error
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(helloWorldBeanListType).size() == 4)
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableErrorsResponseWithMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/mapper/errors";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 400)
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableErrorWithWebException() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/web/errors";
+
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should not include the exception payload (injected by exception mapper)
+        // if some elements have been emitted before
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 403 && !r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testFlowableImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx22/flowable/immediate/mapper/errors";
+
+        final Flowable<Response> obs = ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new FlowableRxInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(FlowableRxInvoker.class)
+                .get();
+
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
index 081a518..53b23df 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
@@ -22,8 +22,13 @@ package org.apache.cxf.systest.jaxrs.reactive;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.xml.ws.Holder;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -36,6 +41,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import io.reactivex.Observable;
 import io.reactivex.disposables.Disposable;
+import io.reactivex.observers.TestObserver;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -87,6 +93,71 @@ public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase
         doTestGetHelloWorldJsonList(address);
     } 
     
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/empty";
+
+        final Observable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ObservableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ObservableRxInvoker.class)
+            .get();
+
+        final TestObserver<Response> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+
+    @Test
+    public void testObservableImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/immediate/errors";
+
+        final Observable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ObservableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ObservableRxInvoker.class)
+            .get(HelloWorldBean.class);
+
+        final TestObserver<HelloWorldBean> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+
+    @Test
+    public void testObservableImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/immediate/mapper/errors";
+
+        final Observable<Response> obs = ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ObservableRxInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ObservableRxInvoker.class)
+                .get();
+
+        final TestObserver<Response> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+
     private void doTestGetHelloWorldJsonList(String address) throws Exception {
         WebClient wc = WebClient.create(address,
                                         Collections.singletonList(new JacksonJsonProvider()));
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java
new file mode 100644
index 0000000..91fd900
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2SingleTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.Flowable;
+import io.reactivex.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava2SingleTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava2SingleServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(RxJava2SingleServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/single/textJson";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/single/textAsync";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.TEXT_PLAIN)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "Hello, world!".equals(r))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/single/error";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
index 2b18c83..76a8b40 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
@@ -51,6 +51,8 @@ public class RxJava2FlowableServer extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber);
         sf.setProvider(new JacksonJsonProvider());
+        sf.setProvider(new IllegalArgumentExceptionMapper());
+        sf.setProvider(new IllegalStateExceptionMapper());
         new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava2FlowableService.class);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
index b89f368..2571ac6 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
@@ -24,11 +24,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import javax.ws.rs.ForbiddenException;
 import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
 import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
@@ -123,6 +126,77 @@ public class RxJava2FlowableService {
 
     }
     
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Flowable<HelloWorldBean> empty() { 
+        return Flowable.empty(); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/mapper/errors")
+    public Flowable<HelloWorldBean> mapperErrors() { 
+        return Flowable 
+            .range(1, 3) 
+            .flatMap(item -> { 
+                if (item < 3) { 
+                    return Flowable.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Flowable.error(new IllegalArgumentException("Oops")); 
+                } 
+            }); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/web/errors")
+    public Flowable<HelloWorldBean> webErrors() { 
+        return Flowable 
+            .range(1, 3) 
+            .concatMap(item -> { 
+                if (item < 3) { 
+                    return Flowable.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Flowable.error(new ForbiddenException("Oops")); 
+                } 
+            }); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Flowable<HelloWorldBean> immediateErrors() { 
+        return Flowable 
+            .range(1, 2) 
+            .flatMap(item -> Flowable.error(new RuntimeException("Oops"))); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Flowable<HelloWorldBean> immediateMapperErrors() { 
+        return Flowable 
+            .range(1, 2) 
+            .flatMap(item -> Flowable.error(new IllegalStateException("Oops"))); 
+    }
+
+    @GET
+    @Path("/mixed/error")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Flowable<HelloWorldBean> errorAndData() {
+        return Flowable
+            .range(1, 5)
+            .flatMap(item -> {
+                if (item <= 4) {
+                    return Flowable.just(new HelloWorldBean(" of Item: " + item));
+                } else {
+                    return Flowable.error(new NotFoundException("Item not found"));
+                }
+            })
+            .onErrorResumeNext((Throwable e) -> Flowable.error(new IllegalStateException("Oops", e)));
+    }
+    
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
 
         private StringBuilder sb = new StringBuilder();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
index a8849d1..294649b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
@@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -42,8 +42,9 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
+        sf.setProvider(new IllegalStateExceptionMapper());
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava2ObservableService.class);
         sf.setResourceProvider(RxJava2ObservableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
index f08147d..362c1ce 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -25,6 +25,7 @@ import java.util.List;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
 
 import io.reactivex.Observable;
 
@@ -54,5 +55,29 @@ public class RxJava2ObservableService {
         bean2.setGreeting("Ciao");
         return Observable.just(Arrays.asList(bean1, bean2));
     }
-  
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Observable<HelloWorldBean> empty() { 
+        return Observable.empty(); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Observable<HelloWorldBean> immediateErrors() { 
+        return Observable 
+            .range(1, 2) 
+            .flatMap(item -> Observable.error(new RuntimeException("Oops"))); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Observable<HelloWorldBean> immediateMapperErrors() { 
+        return Observable 
+            .range(1, 2) 
+            .flatMap(item -> Observable.error(new IllegalStateException("Oops"))); 
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
similarity index 80%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
index a8849d1..d3d881d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleServer.java
@@ -26,15 +26,15 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
-public class RxJava2ObservableServer extends AbstractBusTestServerBase {
-    public static final String PORT = allocatePort(RxJava2ObservableServer.class);
+public class RxJava2SingleServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava2SingleServer.class);
 
     org.apache.cxf.endpoint.Server server;
-    public RxJava2ObservableServer() {
+    public RxJava2SingleServer() {
     }
 
     protected void run() {
@@ -42,12 +42,12 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
-        sf.setResourceClasses(RxJava2ObservableService.class);
-        sf.setResourceProvider(RxJava2ObservableService.class,
-                               new SingletonResourceProvider(new RxJava2ObservableService(), true));
+        sf.setResourceClasses(RxJava2SingleService.class);
+        sf.setResourceProvider(RxJava2SingleService.class,
+                               new SingletonResourceProvider(new RxJava2SingleService(), true));
         sf.setAddress("http://localhost:" + PORT + "/");
         server = sf.create();
     }
@@ -60,7 +60,7 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
 
     public static void main(String[] args) {
         try {
-            RxJava2ObservableServer s = new RxJava2ObservableServer();
+            RxJava2SingleServer s = new RxJava2SingleServer();
             s.start();
         } catch (Exception ex) {
             ex.printStackTrace();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
similarity index 50%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
index f08147d..77a7302 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2SingleService.java
@@ -19,40 +19,55 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
-import java.util.Arrays;
-import java.util.List;
-
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
+
+import io.reactivex.Single;
 
-import io.reactivex.Observable;
+@Path("/rx2/single")
+public class RxJava2SingleService {
 
-@Path("/rx2/observable")
-public class RxJava2ObservableService {
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Single<HelloWorldBean> getJson() {
+        return Single.just(new HelloWorldBean());
+    }
 
     @GET
     @Produces("text/plain")
-    @Path("text")
-    public Observable<String> getText() {
-        return Observable.just("Hello, world!");
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+        
+        Single
+            .just("Hello, ")
+            .map(s -> s + "world!")
+            .subscribe(
+                s -> {
+                    subscriber.onNext(s);
+                    subscriber.onComplete();
+                },
+                subscriber::onError);
+
     }
     
     @GET
     @Produces("application/json")
-    @Path("textJson")
-    public Observable<HelloWorldBean> getJson() {
-        return Observable.just(new HelloWorldBean());
+    @Path("error")
+    public Single<HelloWorldBean> getError() {
+        return Single.error(new RuntimeException("Oops"));
     }
+
     
-    @GET
-    @Produces("application/json")
-    @Path("textJsonList")
-    public Observable<List<HelloWorldBean>> getJsonList() {
-        HelloWorldBean bean1 = new HelloWorldBean();
-        HelloWorldBean bean2 = new HelloWorldBean();
-        bean2.setGreeting("Ciao");
-        return Observable.just(Arrays.asList(bean1, bean2));
+    private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
     }
-  
-}
+}
\ No newline at end of file