You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/11/26 01:28:14 UTC

[cxf] 01/01: CXF-8357: RxJava2/RxJava3: Add support of Maybe type

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch CXF-8357
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 636e5bbdbee64e8d42f2f1e434fa062db75aa88a
Author: reta <dr...@gmail.com>
AuthorDate: Tue Nov 24 19:58:29 2020 -0500

    CXF-8357: RxJava2/RxJava3: Add support of Maybe type
---
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java    |  18 +++
 .../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java    |  18 +++
 .../jaxrs/reactive/JAXRSRxJava2MaybeTest.java      | 136 +++++++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava3MaybeTest.java      | 136 +++++++++++++++++++++
 .../systest/jaxrs/reactive/RxJava2MaybeServer.java |  73 +++++++++++
 .../RxJava2MaybeService.java}                      |  53 ++++----
 .../systest/jaxrs/reactive/RxJava3MaybeServer.java |  73 +++++++++++
 .../RxJava3MaybeService.java}                      |  53 ++++----
 .../cxf/systest/jaxrs/reactor/MonoReactorTest.java |  18 +++
 .../cxf/systest/jaxrs/reactor/MonoService.java     |   7 +-
 10 files changed, 536 insertions(+), 49 deletions(-)

diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 4b87432..4bf5a43 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -25,6 +25,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
 
 import io.reactivex.Flowable;
+import io.reactivex.Maybe;
 import io.reactivex.Observable;
 import io.reactivex.Single;
 import io.reactivex.disposables.Disposable;
@@ -37,10 +38,21 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
             return handleSingle(inMessage, (Single<?>)result);
         } else if (result instanceof Observable) {
             return handleObservable(inMessage, (Observable<?>)result);
+        } else if (result instanceof Maybe) {
+            return handleMaybe(inMessage, (Maybe<?>)result);
         }
         return null;
     }
     
+    protected AsyncResponseImpl handleMaybe(Message inMessage, Maybe<?> maybe) {
+        final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+        Disposable d = subscribe(maybe, asyncResponse);
+        if (d == null) {
+            throw new IllegalStateException("Subscribe did not return a Disposable");
+        }
+        return asyncResponse;
+    }
+    
     protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
@@ -81,4 +93,10 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
             .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
             .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
     }
+    
+    private <T> Disposable subscribe(Maybe<T> maybe, final AsyncResponseImpl asyncResponse) {
+        return maybe
+            .switchIfEmpty(Maybe.<T>empty().doOnComplete(() -> asyncResponse.resume(null)))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
 }
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
index 4d6be4f..c9162d9 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
@@ -25,6 +25,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
 
 import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
 import io.reactivex.rxjava3.core.Observable;
 import io.reactivex.rxjava3.core.Single;
 import io.reactivex.rxjava3.disposables.Disposable;
@@ -37,10 +38,21 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
             return handleSingle(inMessage, (Single<?>)result);
         } else if (result instanceof Observable) {
             return handleObservable(inMessage, (Observable<?>)result);
+        } else if (result instanceof Maybe) {
+            return handleMaybe(inMessage, (Maybe<?>)result);
         }
         return null;
     }
     
+    protected AsyncResponseImpl handleMaybe(Message inMessage, Maybe<?> maybe) {
+        final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+        Disposable d = subscribe(maybe, asyncResponse);
+        if (d == null) {
+            throw new IllegalStateException("Subscribe did not return a Disposable");
+        }
+        return asyncResponse;
+    }
+    
     protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
@@ -81,4 +93,10 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
             .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
             .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
     }
+    
+    private <T> Disposable subscribe(Maybe<T> maybe, final AsyncResponseImpl asyncResponse) {
+        return maybe
+            .switchIfEmpty(Maybe.<T>empty().doOnComplete(() -> asyncResponse.resume(null)))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java
new file mode 100644
index 0000000..f4f351e
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.Flowable;
+import io.reactivex.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava2MaybeTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava2MaybeServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(RxJava2MaybeServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/maybe/textJson";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/maybe/textAsync";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.TEXT_PLAIN)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "Hello, world!".equals(r))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/maybe/error";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/maybe/empty";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> !r.hasEntity())
+            .assertComplete();
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java
new file mode 100644
index 0000000..1c4c95e
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava3MaybeTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava3MaybeServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(RxJava3MaybeServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/maybe/textJson";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/maybe/textAsync";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.TEXT_PLAIN)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "Hello, world!".equals(r))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/maybe/error";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/maybe/empty";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> !r.hasEntity())
+            .assertComplete();
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java
new file mode 100644
index 0000000..58a79a2
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2MaybeServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava2MaybeServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJava2MaybeServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava2MaybeService.class);
+        sf.setResourceProvider(RxJava2MaybeService.class,
+                               new SingletonResourceProvider(new RxJava2MaybeService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava2MaybeServer s = new RxJava2MaybeServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
similarity index 63%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
index 300def8..71e37b8 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.cxf.systest.jaxrs.reactor;
+package org.apache.cxf.systest.jaxrs.reactive;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -27,47 +27,52 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
-import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean;
 
-@Path("/mono")
-public class MonoService {
+import io.reactivex.Maybe;
 
-    @GET
-    @Produces("application/json")
-    @Path("textJson")
-    public Mono<HelloWorldBean> getJson() {
-        return Mono.just(new HelloWorldBean());
-    }
+@Path("/rx2/maybe")
+public class RxJava2MaybeService {
 
     @GET
     @Produces("application/json")
-    @Path("textJsonImplicitListAsyncStream")
-    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
-        Mono.just("Hello")
-                .map(HelloWorldBean::new)
-                .subscribeOn(Schedulers.elastic())
-                .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000L, 0L));
+    @Path("textJson")
+    public Maybe<HelloWorldBean> getJson() {
+        return Maybe.just(new HelloWorldBean());
     }
 
     @GET
     @Produces("text/plain")
     @Path("textAsync")
     public void getTextAsync(@Suspended final AsyncResponse ar) {
-        Mono.just("Hello, ").map(s -> s + "world!")
-                .subscribe(new StringAsyncSubscriber(ar));
+        final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+        
+        Maybe
+            .just("Hello, ")
+            .map(s -> s + "world!")
+            .subscribe(
+                s -> {
+                    subscriber.onNext(s);
+                    subscriber.onComplete();
+                },
+                subscriber::onError);
 
     }
     
     @GET
+    @Produces("application/json")
+    @Path("error")
+    public Maybe<HelloWorldBean> getError() {
+        return Maybe.error(new RuntimeException("Oops"));
+    }
+    
+    @GET
     @Produces(MediaType.APPLICATION_JSON)
-    @Path("/empty")
-    public Mono<HelloWorldBean> empty() { 
-        return Mono.empty(); 
+    @Path("empty")
+    public Maybe<HelloWorldBean> empty() { 
+        return Maybe.empty(); 
     }
 
-
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
         StringAsyncSubscriber(AsyncResponse ar) {
             super(ar);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java
new file mode 100644
index 0000000..550f989
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava3MaybeServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava3MaybeServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJava3MaybeServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava3MaybeService.class);
+        sf.setResourceProvider(RxJava3MaybeService.class,
+                               new SingletonResourceProvider(new RxJava3MaybeService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava3MaybeServer s = new RxJava3MaybeServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
similarity index 63%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
index 300def8..4673034 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.cxf.systest.jaxrs.reactor;
+package org.apache.cxf.systest.jaxrs.reactive;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -27,47 +27,52 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
-import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean;
 
-@Path("/mono")
-public class MonoService {
+import io.reactivex.rxjava3.core.Maybe;
 
-    @GET
-    @Produces("application/json")
-    @Path("textJson")
-    public Mono<HelloWorldBean> getJson() {
-        return Mono.just(new HelloWorldBean());
-    }
+@Path("/rx3/maybe")
+public class RxJava3MaybeService {
 
     @GET
     @Produces("application/json")
-    @Path("textJsonImplicitListAsyncStream")
-    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
-        Mono.just("Hello")
-                .map(HelloWorldBean::new)
-                .subscribeOn(Schedulers.elastic())
-                .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000L, 0L));
+    @Path("textJson")
+    public Maybe<HelloWorldBean> getJson() {
+        return Maybe.just(new HelloWorldBean());
     }
 
     @GET
     @Produces("text/plain")
     @Path("textAsync")
     public void getTextAsync(@Suspended final AsyncResponse ar) {
-        Mono.just("Hello, ").map(s -> s + "world!")
-                .subscribe(new StringAsyncSubscriber(ar));
+        final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+        
+        Maybe
+            .just("Hello, ")
+            .map(s -> s + "world!")
+            .subscribe(
+                s -> {
+                    subscriber.onNext(s);
+                    subscriber.onComplete();
+                },
+                subscriber::onError);
 
     }
     
     @GET
+    @Produces("application/json")
+    @Path("error")
+    public Maybe<HelloWorldBean> getError() {
+        return Maybe.error(new RuntimeException("Oops"));
+    }
+    
+    @GET
     @Produces(MediaType.APPLICATION_JSON)
-    @Path("/empty")
-    public Mono<HelloWorldBean> empty() { 
-        return Mono.empty(); 
+    @Path("empty")
+    public Maybe<HelloWorldBean> empty() { 
+        return Maybe.empty(); 
     }
 
-
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
         StringAsyncSubscriber(AsyncResponse ar) {
             super(ar);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
index 18054e3..bacac30 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
+import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.MediaType;
 
@@ -112,4 +113,21 @@ public class MonoReactorTest extends AbstractBusClientServerTestBase {
             .expectComplete()
             .verify();
     }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/error";
+        
+        StepVerifier
+        .create(ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ReactorInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ReactorInvoker.class)
+            .get(HelloWorldBean.class))
+        .expectErrorMatches(ex -> ex.getCause() instanceof InternalServerErrorException)
+        .verify();
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
index 300def8..658449b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -57,7 +57,6 @@ public class MonoService {
     public void getTextAsync(@Suspended final AsyncResponse ar) {
         Mono.just("Hello, ").map(s -> s + "world!")
                 .subscribe(new StringAsyncSubscriber(ar));
-
     }
     
     @GET
@@ -67,6 +66,12 @@ public class MonoService {
         return Mono.empty(); 
     }
 
+    @GET
+    @Produces("application/json")
+    @Path("error")
+    public Mono<HelloWorldBean> getError() {
+        return Mono.error(new RuntimeException("Oops"));
+    }
 
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
         StringAsyncSubscriber(AsyncResponse ar) {