You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/11/26 01:28:13 UTC

[cxf] branch CXF-8357 created (now 636e5bb)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a change to branch CXF-8357
in repository https://gitbox.apache.org/repos/asf/cxf.git.


      at 636e5bb  CXF-8357: RxJava2/RxJava3: Add support of Maybe type

This branch includes the following new commits:

     new 636e5bb  CXF-8357: RxJava2/RxJava3: Add support of Maybe type

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[cxf] 01/01: CXF-8357: RxJava2/RxJava3: Add support of Maybe type

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch CXF-8357
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 636e5bbdbee64e8d42f2f1e434fa062db75aa88a
Author: reta <dr...@gmail.com>
AuthorDate: Tue Nov 24 19:58:29 2020 -0500

    CXF-8357: RxJava2/RxJava3: Add support of Maybe type
---
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java    |  18 +++
 .../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java    |  18 +++
 .../jaxrs/reactive/JAXRSRxJava2MaybeTest.java      | 136 +++++++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava3MaybeTest.java      | 136 +++++++++++++++++++++
 .../systest/jaxrs/reactive/RxJava2MaybeServer.java |  73 +++++++++++
 .../RxJava2MaybeService.java}                      |  53 ++++----
 .../systest/jaxrs/reactive/RxJava3MaybeServer.java |  73 +++++++++++
 .../RxJava3MaybeService.java}                      |  53 ++++----
 .../cxf/systest/jaxrs/reactor/MonoReactorTest.java |  18 +++
 .../cxf/systest/jaxrs/reactor/MonoService.java     |   7 +-
 10 files changed, 536 insertions(+), 49 deletions(-)

diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 4b87432..4bf5a43 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -25,6 +25,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
 
 import io.reactivex.Flowable;
+import io.reactivex.Maybe;
 import io.reactivex.Observable;
 import io.reactivex.Single;
 import io.reactivex.disposables.Disposable;
@@ -37,10 +38,21 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
             return handleSingle(inMessage, (Single<?>)result);
         } else if (result instanceof Observable) {
             return handleObservable(inMessage, (Observable<?>)result);
+        } else if (result instanceof Maybe) {
+            return handleMaybe(inMessage, (Maybe<?>)result);
         }
         return null;
     }
     
+    protected AsyncResponseImpl handleMaybe(Message inMessage, Maybe<?> maybe) {
+        final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+        Disposable d = subscribe(maybe, asyncResponse);
+        if (d == null) {
+            throw new IllegalStateException("Subscribe did not return a Disposable");
+        }
+        return asyncResponse;
+    }
+    
     protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
@@ -81,4 +93,10 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
             .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
             .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
     }
+    
+    private <T> Disposable subscribe(Maybe<T> maybe, final AsyncResponseImpl asyncResponse) {
+        return maybe
+            .switchIfEmpty(Maybe.<T>empty().doOnComplete(() -> asyncResponse.resume(null)))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
 }
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
index 4d6be4f..c9162d9 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
@@ -25,6 +25,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
 
 import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
 import io.reactivex.rxjava3.core.Observable;
 import io.reactivex.rxjava3.core.Single;
 import io.reactivex.rxjava3.disposables.Disposable;
@@ -37,10 +38,21 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
             return handleSingle(inMessage, (Single<?>)result);
         } else if (result instanceof Observable) {
             return handleObservable(inMessage, (Observable<?>)result);
+        } else if (result instanceof Maybe) {
+            return handleMaybe(inMessage, (Maybe<?>)result);
         }
         return null;
     }
     
+    protected AsyncResponseImpl handleMaybe(Message inMessage, Maybe<?> maybe) {
+        final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+        Disposable d = subscribe(maybe, asyncResponse);
+        if (d == null) {
+            throw new IllegalStateException("Subscribe did not return a Disposable");
+        }
+        return asyncResponse;
+    }
+    
     protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
@@ -81,4 +93,10 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
             .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
             .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
     }
+    
+    private <T> Disposable subscribe(Maybe<T> maybe, final AsyncResponseImpl asyncResponse) {
+        return maybe
+            .switchIfEmpty(Maybe.<T>empty().doOnComplete(() -> asyncResponse.resume(null)))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java
new file mode 100644
index 0000000..f4f351e
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.Flowable;
+import io.reactivex.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava2MaybeTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava2MaybeServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(RxJava2MaybeServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/maybe/textJson";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/maybe/textAsync";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.TEXT_PLAIN)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "Hello, world!".equals(r))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/maybe/error";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/maybe/empty";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> !r.hasEntity())
+            .assertComplete();
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java
new file mode 100644
index 0000000..1c4c95e
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava3MaybeTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava3MaybeServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(RxJava3MaybeServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/maybe/textJson";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/maybe/textAsync";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.TEXT_PLAIN)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "Hello, world!".equals(r))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/maybe/error";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/maybe/empty";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> !r.hasEntity())
+            .assertComplete();
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java
new file mode 100644
index 0000000..58a79a2
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2MaybeServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava2MaybeServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJava2MaybeServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava2MaybeService.class);
+        sf.setResourceProvider(RxJava2MaybeService.class,
+                               new SingletonResourceProvider(new RxJava2MaybeService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava2MaybeServer s = new RxJava2MaybeServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
similarity index 63%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
index 300def8..71e37b8 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.cxf.systest.jaxrs.reactor;
+package org.apache.cxf.systest.jaxrs.reactive;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -27,47 +27,52 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
-import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean;
 
-@Path("/mono")
-public class MonoService {
+import io.reactivex.Maybe;
 
-    @GET
-    @Produces("application/json")
-    @Path("textJson")
-    public Mono<HelloWorldBean> getJson() {
-        return Mono.just(new HelloWorldBean());
-    }
+@Path("/rx2/maybe")
+public class RxJava2MaybeService {
 
     @GET
     @Produces("application/json")
-    @Path("textJsonImplicitListAsyncStream")
-    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
-        Mono.just("Hello")
-                .map(HelloWorldBean::new)
-                .subscribeOn(Schedulers.elastic())
-                .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000L, 0L));
+    @Path("textJson")
+    public Maybe<HelloWorldBean> getJson() {
+        return Maybe.just(new HelloWorldBean());
     }
 
     @GET
     @Produces("text/plain")
     @Path("textAsync")
     public void getTextAsync(@Suspended final AsyncResponse ar) {
-        Mono.just("Hello, ").map(s -> s + "world!")
-                .subscribe(new StringAsyncSubscriber(ar));
+        final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+        
+        Maybe
+            .just("Hello, ")
+            .map(s -> s + "world!")
+            .subscribe(
+                s -> {
+                    subscriber.onNext(s);
+                    subscriber.onComplete();
+                },
+                subscriber::onError);
 
     }
     
     @GET
+    @Produces("application/json")
+    @Path("error")
+    public Maybe<HelloWorldBean> getError() {
+        return Maybe.error(new RuntimeException("Oops"));
+    }
+    
+    @GET
     @Produces(MediaType.APPLICATION_JSON)
-    @Path("/empty")
-    public Mono<HelloWorldBean> empty() { 
-        return Mono.empty(); 
+    @Path("empty")
+    public Maybe<HelloWorldBean> empty() { 
+        return Maybe.empty(); 
     }
 
-
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
         StringAsyncSubscriber(AsyncResponse ar) {
             super(ar);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java
new file mode 100644
index 0000000..550f989
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava3MaybeServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava3MaybeServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJava3MaybeServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava3MaybeService.class);
+        sf.setResourceProvider(RxJava3MaybeService.class,
+                               new SingletonResourceProvider(new RxJava3MaybeService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava3MaybeServer s = new RxJava3MaybeServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
similarity index 63%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
index 300def8..4673034 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.cxf.systest.jaxrs.reactor;
+package org.apache.cxf.systest.jaxrs.reactive;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -27,47 +27,52 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
-import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean;
 
-@Path("/mono")
-public class MonoService {
+import io.reactivex.rxjava3.core.Maybe;
 
-    @GET
-    @Produces("application/json")
-    @Path("textJson")
-    public Mono<HelloWorldBean> getJson() {
-        return Mono.just(new HelloWorldBean());
-    }
+@Path("/rx3/maybe")
+public class RxJava3MaybeService {
 
     @GET
     @Produces("application/json")
-    @Path("textJsonImplicitListAsyncStream")
-    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
-        Mono.just("Hello")
-                .map(HelloWorldBean::new)
-                .subscribeOn(Schedulers.elastic())
-                .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000L, 0L));
+    @Path("textJson")
+    public Maybe<HelloWorldBean> getJson() {
+        return Maybe.just(new HelloWorldBean());
     }
 
     @GET
     @Produces("text/plain")
     @Path("textAsync")
     public void getTextAsync(@Suspended final AsyncResponse ar) {
-        Mono.just("Hello, ").map(s -> s + "world!")
-                .subscribe(new StringAsyncSubscriber(ar));
+        final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+        
+        Maybe
+            .just("Hello, ")
+            .map(s -> s + "world!")
+            .subscribe(
+                s -> {
+                    subscriber.onNext(s);
+                    subscriber.onComplete();
+                },
+                subscriber::onError);
 
     }
     
     @GET
+    @Produces("application/json")
+    @Path("error")
+    public Maybe<HelloWorldBean> getError() {
+        return Maybe.error(new RuntimeException("Oops"));
+    }
+    
+    @GET
     @Produces(MediaType.APPLICATION_JSON)
-    @Path("/empty")
-    public Mono<HelloWorldBean> empty() { 
-        return Mono.empty(); 
+    @Path("empty")
+    public Maybe<HelloWorldBean> empty() { 
+        return Maybe.empty(); 
     }
 
-
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
         StringAsyncSubscriber(AsyncResponse ar) {
             super(ar);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
index 18054e3..bacac30 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
+import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.MediaType;
 
@@ -112,4 +113,21 @@ public class MonoReactorTest extends AbstractBusClientServerTestBase {
             .expectComplete()
             .verify();
     }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/error";
+        
+        StepVerifier
+        .create(ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ReactorInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ReactorInvoker.class)
+            .get(HelloWorldBean.class))
+        .expectErrorMatches(ex -> ex.getCause() instanceof InternalServerErrorException)
+        .verify();
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
index 300def8..658449b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -57,7 +57,6 @@ public class MonoService {
     public void getTextAsync(@Suspended final AsyncResponse ar) {
         Mono.just("Hello, ").map(s -> s + "world!")
                 .subscribe(new StringAsyncSubscriber(ar));
-
     }
     
     @GET
@@ -67,6 +66,12 @@ public class MonoService {
         return Mono.empty(); 
     }
 
+    @GET
+    @Produces("application/json")
+    @Path("error")
+    public Mono<HelloWorldBean> getError() {
+        return Mono.error(new RuntimeException("Oops"));
+    }
 
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
         StringAsyncSubscriber(AsyncResponse ar) {