You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/10/23 22:03:48 UTC

[cxf] 01/02: CXF-8355: RxJava3: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit ec0ad05e9720d8b4b893fc4ee2ca8ca1ed30cf7d
Author: reta <dr...@gmail.com>
AuthorDate: Sat Oct 10 18:22:38 2020 -0400

    CXF-8355: RxJava3: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
    
    (cherry picked from commit 5fded377430dcfd461cf8714ec893cbf18f606a4)
---
 .../cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java |   2 +
 .../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java    |  17 +-
 .../reactive/IllegalArgumentExceptionMapper.java   |  22 +--
 .../reactive/IllegalStateExceptionMapper.java      |  25 ++-
 .../jaxrs/reactive/JAXRSRxJava3FlowableTest.java   | 183 +++++++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava3ObservableTest.java |  70 ++++++++
 .../jaxrs/reactive/JAXRSRxJava3SingleTest.java     | 113 +++++++++++++
 .../jaxrs/reactive/RxJava3FlowableServer.java      |   2 +
 .../jaxrs/reactive/RxJava3FlowableService.java     |  75 +++++++++
 .../jaxrs/reactive/RxJava3ObservableServer.java    |   5 +-
 .../jaxrs/reactive/RxJava3ObservableService.java   |  25 +++
 ...ervableServer.java => RxJava3SingleServer.java} |  18 +-
 ...vableService.java => RxJava3SingleService.java} |  60 ++++---
 13 files changed, 555 insertions(+), 62 deletions(-)

diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
index bf07eec..592b9e6 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
+++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.rx3.server;
 
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.jaxrs.reactivestreams.server.ResponseStatusOnlyExceptionMapper;
 import org.apache.cxf.service.invoker.Invoker;
 
 public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
@@ -31,6 +32,7 @@ public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
         if (useStreamingSubscriber != null) {
             invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
         }
+        bean.setProvider(new ResponseStatusOnlyExceptionMapper());
         return invoker;
     }
 }
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
index 6739092..4d6be4f 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cxf.jaxrs.rx3.server;
 
+import java.util.Collections;
+
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
@@ -51,7 +53,7 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
-            Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+            Disposable d = subscribe(f, asyncResponse);
             if (d == null) {
                 throw new IllegalStateException("Subscribe did not return a Disposable");
             }
@@ -61,11 +63,22 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     
     protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-        Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+        Disposable d = subscribe(obs, asyncResponse);
         if (d == null) {
             throw new IllegalStateException("Subscribe did not return a Disposable");
         }
         return asyncResponse;
     }
 
+    private <T> Disposable subscribe(final Flowable<T> f, final AsyncResponseImpl asyncResponse) {
+        return f
+            .switchIfEmpty(Flowable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
+
+    private <T> Disposable subscribe(final Observable<T> obs, final AsyncResponseImpl asyncResponse) {
+        return obs
+            .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
 }
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java
similarity index 53%
copy from rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java
index bf07eec..a1dc412 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java
@@ -16,21 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.rx3.server;
 
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
-import org.apache.cxf.service.invoker.Invoker;
+package org.apache.cxf.systest.jaxrs.reactive;
 
-public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class IllegalArgumentExceptionMapper implements ExceptionMapper<IllegalArgumentException> {
     @Override
-    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
-        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
-                .getOrDefault("useStreamingSubscriber", null);
-        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
-        if (useStreamingSubscriber != null) {
-            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        }
-        return invoker;
+    public Response toResponse(IllegalArgumentException exception) {
+        return Response.status(400).build();
     }
 }
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java
similarity index 53%
copy from rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java
index bf07eec..95ae19b 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.rx3.server;
 
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
-import org.apache.cxf.service.invoker.Invoker;
+package org.apache.cxf.systest.jaxrs.reactive;
 
-public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class IllegalStateExceptionMapper implements ExceptionMapper<IllegalStateException> {
     @Override
-    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
-        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
-                .getOrDefault("useStreamingSubscriber", null);
-        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
-        if (useStreamingSubscriber != null) {
-            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        }
-        return invoker;
+    public Response toResponse(IllegalStateException exception) {
+        return Response
+            .status(409)
+            .entity(exception)
+            .build();
     }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
index 9df86bc..980129e 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
@@ -24,10 +24,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Invocation;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
@@ -64,6 +67,186 @@ public class JAXRSRxJava3FlowableTest extends AbstractBusClientServerTestBase {
     }
 
     @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/flowable/empty";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetHelloWorldEmpty2() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/empty";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/immediate/errors";
+        
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+    
+    @Test
+    public void testFlowableErrorWithExceptionMapperReturnsNoExceptionPayload() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/mixed/error";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+            
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should not include the exception payload (injected by exception mapper)
+        // if some elements have been emitted before
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && !r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableErrorWithExceptionMapperReturnsContentPayload() throws Exception {
+        GenericType<List<HelloWorldBean>> helloWorldBeanListType = new GenericType<List<HelloWorldBean>>() {  };
+        String address = "http://localhost:" + PORT + "/rx33/flowable/mixed/error";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should include the emitted elements prior the error
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(helloWorldBeanListType).size() == 4)
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableErrorsResponseWithMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/mapper/errors";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 400)
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableErrorWithWebException() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/web/errors";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should not include the exception payload (injected by exception mapper)
+        // if some elements have been emitted before
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 403 && !r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/immediate/mapper/errors";
+        
+        final Flowable<Response> obs = ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new FlowableRxInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(FlowableRxInvoker.class)
+                .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+    
+    @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:" + PORT + "/rx3/flowable/textJson";
         List<Object> providers = new LinkedList<>();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
index 6b66577..f1eb9f0 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
@@ -22,8 +22,13 @@ package org.apache.cxf.systest.jaxrs.reactive;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.xml.ws.Holder;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -36,6 +41,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import io.reactivex.rxjava3.core.Observable;
 import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.observers.TestObserver;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -88,7 +94,71 @@ public class JAXRSRxJava3ObservableTest extends AbstractBusClientServerTestBase
         String address = "http://localhost:" + PORT + "/rx3/observable/textJsonList";
         doTestGetHelloWorldJsonList(address);
     }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/observable/empty";
+        
+        final Observable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ObservableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ObservableRxInvoker.class)
+            .get();
+        
+        final TestObserver<Response> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
 
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testObservableImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/observable/immediate/errors";
+        
+        final Observable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ObservableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ObservableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestObserver<HelloWorldBean> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+    
+    @Test
+    public void testObservableImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/observable/immediate/mapper/errors";
+        
+        final Observable<Response> obs = ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ObservableRxInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ObservableRxInvoker.class)
+                .get();
+        
+        final TestObserver<Response> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+    
     private void doTestGetHelloWorldJsonList(String address) throws Exception {
         WebClient wc = WebClient.create(address,
                                         Collections.singletonList(new JacksonJsonProvider()));
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java
new file mode 100644
index 0000000..563cfcd
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava3SingleTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava3SingleServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(RxJava3SingleServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/single/textJson";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/single/textAsync";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.TEXT_PLAIN)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "Hello, world!".equals(r))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/single/error";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
index f78d918..109c2a2 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
@@ -51,6 +51,8 @@ public class RxJava3FlowableServer extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber);
         sf.setProvider(new JacksonJsonProvider());
+        sf.setProvider(new IllegalArgumentExceptionMapper());
+        sf.setProvider(new IllegalStateExceptionMapper());
         new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava3FlowableService.class);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
index af67f50..618d3e4 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
@@ -24,14 +24,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import javax.ws.rs.ForbiddenException;
 import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
 import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean;
 
 import io.reactivex.rxjava3.core.BackpressureStrategy;
 import io.reactivex.rxjava3.core.Flowable;
@@ -51,6 +55,77 @@ public class RxJava3FlowableService {
     }
     
     @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Flowable<HelloWorldBean> empty() { 
+        return Flowable.empty(); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/mapper/errors")
+    public Flowable<HelloWorldBean> mapperErrors() { 
+        return Flowable 
+            .range(1, 3) 
+            .flatMap(item -> { 
+                if (item < 3) { 
+                    return Flowable.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Flowable.error(new IllegalArgumentException("Oops")); 
+                } 
+            }); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/web/errors")
+    public Flowable<HelloWorldBean> webErrors() { 
+        return Flowable 
+            .range(1, 3) 
+            .concatMap(item -> { 
+                if (item < 3) { 
+                    return Flowable.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Flowable.error(new ForbiddenException("Oops")); 
+                } 
+            }); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Flowable<HelloWorldBean> immediateErrors() { 
+        return Flowable 
+            .range(1, 2) 
+            .flatMap(item -> Flowable.error(new RuntimeException("Oops"))); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Flowable<HelloWorldBean> immediateMapperErrors() { 
+        return Flowable 
+            .range(1, 2) 
+            .flatMap(item -> Flowable.error(new IllegalStateException("Oops"))); 
+    }
+    
+    @GET
+    @Path("/mixed/error")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Flowable<HelloWorldBean> errorAndData() {
+        return Flowable
+            .range(1, 5)
+            .flatMap(item -> {
+                if (item <= 4) {
+                    return Flowable.just(new HelloWorldBean(" of Item: " + item));
+                } else {
+                    return Flowable.error(new NotFoundException("Item not found"));
+                }
+            })
+            .onErrorResumeNext(e -> Flowable.error(new IllegalStateException("Oops", e)));
+    }
+    
+    @GET
     @Produces("application/json")
     @Path("textJsonImplicitListAsync")
     public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
index 0a8fcaa..6365bd8 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
@@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx3.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -42,8 +42,9 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
+        sf.setProvider(new IllegalStateExceptionMapper());
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava3ObservableService.class);
         sf.setResourceProvider(RxJava3ObservableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
index 51d362c..7393d40 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
@@ -25,6 +25,7 @@ import java.util.List;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
 
 import io.reactivex.rxjava3.core.Observable;
 
@@ -55,4 +56,28 @@ public class RxJava3ObservableService {
         return Observable.just(Arrays.asList(bean1, bean2));
     }
   
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Observable<HelloWorldBean> empty() { 
+        return Observable.empty(); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Observable<HelloWorldBean> immediateErrors() { 
+        return Observable 
+            .range(1, 2) 
+            .flatMap(item -> Observable.error(new RuntimeException("Oops"))); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Observable<HelloWorldBean> immediateMapperErrors() { 
+        return Observable 
+            .range(1, 2) 
+            .flatMap(item -> Observable.error(new IllegalStateException("Oops"))); 
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java
similarity index 80%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java
index 0a8fcaa..4b91c33b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java
@@ -26,15 +26,15 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx3.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
-public class RxJava3ObservableServer extends AbstractBusTestServerBase {
-    public static final String PORT = allocatePort(RxJava3ObservableServer.class);
+public class RxJava3SingleServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava3SingleServer.class);
 
     org.apache.cxf.endpoint.Server server;
-    public RxJava3ObservableServer() {
+    public RxJava3SingleServer() {
     }
 
     protected void run() {
@@ -42,12 +42,12 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
-        sf.setResourceClasses(RxJava3ObservableService.class);
-        sf.setResourceProvider(RxJava3ObservableService.class,
-                               new SingletonResourceProvider(new RxJava3ObservableService(), true));
+        sf.setResourceClasses(RxJava3SingleService.class);
+        sf.setResourceProvider(RxJava3SingleService.class,
+                               new SingletonResourceProvider(new RxJava3SingleService(), true));
         sf.setAddress("http://localhost:" + PORT + "/");
         server = sf.create();
     }
@@ -60,7 +60,7 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase {
 
     public static void main(String[] args) {
         try {
-            RxJava3ObservableServer s = new RxJava3ObservableServer();
+            RxJava3SingleServer s = new RxJava3SingleServer();
             s.start();
         } catch (Exception ex) {
             ex.printStackTrace();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java
similarity index 50%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java
index 51d362c..9fc0797 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java
@@ -19,40 +19,54 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
-import java.util.Arrays;
-import java.util.List;
-
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 
-import io.reactivex.rxjava3.core.Observable;
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
 
-@Path("/rx3/observable")
-public class RxJava3ObservableService {
+import io.reactivex.rxjava3.core.Single;
+
+@Path("/rx3/single")
+public class RxJava3SingleService {
 
-    @GET
-    @Produces("text/plain")
-    @Path("text")
-    public Observable<String> getText() {
-        return Observable.just("Hello, world!");
-    }
-    
     @GET
     @Produces("application/json")
     @Path("textJson")
-    public Observable<HelloWorldBean> getJson() {
-        return Observable.just(new HelloWorldBean());
+    public Single<HelloWorldBean> getJson() {
+        return Single.just(new HelloWorldBean());
+    }
+
+    @GET
+    @Produces("text/plain")
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+        
+        Single
+            .just("Hello, ")
+            .map(s -> s + "world!")
+            .subscribe(
+                s -> {
+                    subscriber.onNext(s);
+                    subscriber.onComplete();
+                },
+                subscriber::onError);
+
     }
     
     @GET
     @Produces("application/json")
-    @Path("textJsonList")
-    public Observable<List<HelloWorldBean>> getJsonList() {
-        HelloWorldBean bean1 = new HelloWorldBean();
-        HelloWorldBean bean2 = new HelloWorldBean();
-        bean2.setGreeting("Ciao");
-        return Observable.just(Arrays.asList(bean1, bean2));
+    @Path("error")
+    public Single<HelloWorldBean> getError() {
+        return Single.error(new RuntimeException("Oops"));
+    }
+
+    private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
     }
-  
-}
+}
\ No newline at end of file