You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/11/26 01:28:14 UTC
[cxf] 01/01: CXF-8357: RxJava2/RxJava3: Add support of Maybe type
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch CXF-8357
in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 636e5bbdbee64e8d42f2f1e434fa062db75aa88a
Author: reta <dr...@gmail.com>
AuthorDate: Tue Nov 24 19:58:29 2020 -0500
CXF-8357: RxJava2/RxJava3: Add support of Maybe type
---
.../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java | 18 +++
.../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java | 18 +++
.../jaxrs/reactive/JAXRSRxJava2MaybeTest.java | 136 +++++++++++++++++++++
.../jaxrs/reactive/JAXRSRxJava3MaybeTest.java | 136 +++++++++++++++++++++
.../systest/jaxrs/reactive/RxJava2MaybeServer.java | 73 +++++++++++
.../RxJava2MaybeService.java} | 53 ++++----
.../systest/jaxrs/reactive/RxJava3MaybeServer.java | 73 +++++++++++
.../RxJava3MaybeService.java} | 53 ++++----
.../cxf/systest/jaxrs/reactor/MonoReactorTest.java | 18 +++
.../cxf/systest/jaxrs/reactor/MonoService.java | 7 +-
10 files changed, 536 insertions(+), 49 deletions(-)
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 4b87432..4bf5a43 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -25,6 +25,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
import org.apache.cxf.message.Message;
import io.reactivex.Flowable;
+import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
@@ -37,10 +38,21 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
return handleSingle(inMessage, (Single<?>)result);
} else if (result instanceof Observable) {
return handleObservable(inMessage, (Observable<?>)result);
+ } else if (result instanceof Maybe) {
+ return handleMaybe(inMessage, (Maybe<?>)result);
}
return null;
}
+ protected AsyncResponseImpl handleMaybe(Message inMessage, Maybe<?> maybe) {
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ Disposable d = subscribe(maybe, asyncResponse);
+ if (d == null) {
+ throw new IllegalStateException("Subscribe did not return a Disposable");
+ }
+ return asyncResponse;
+ }
+
protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
@@ -81,4 +93,10 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
.switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
}
+
+ private <T> Disposable subscribe(Maybe<T> maybe, final AsyncResponseImpl asyncResponse) {
+ return maybe
+ .switchIfEmpty(Maybe.<T>empty().doOnComplete(() -> asyncResponse.resume(null)))
+ .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+ }
}
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
index 4d6be4f..c9162d9 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
@@ -25,6 +25,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
import org.apache.cxf.message.Message;
import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
@@ -37,10 +38,21 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
return handleSingle(inMessage, (Single<?>)result);
} else if (result instanceof Observable) {
return handleObservable(inMessage, (Observable<?>)result);
+ } else if (result instanceof Maybe) {
+ return handleMaybe(inMessage, (Maybe<?>)result);
}
return null;
}
+ protected AsyncResponseImpl handleMaybe(Message inMessage, Maybe<?> maybe) {
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ Disposable d = subscribe(maybe, asyncResponse);
+ if (d == null) {
+ throw new IllegalStateException("Subscribe did not return a Disposable");
+ }
+ return asyncResponse;
+ }
+
protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
@@ -81,4 +93,10 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
.switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
}
+
+ private <T> Disposable subscribe(Maybe<T> maybe, final AsyncResponseImpl asyncResponse) {
+ return maybe
+ .switchIfEmpty(Maybe.<T>empty().doOnComplete(() -> asyncResponse.resume(null)))
+ .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+ }
}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java
new file mode 100644
index 0000000..f4f351e
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.Flowable;
+import io.reactivex.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava2MaybeTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJava2MaybeServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly", launchServer(RxJava2MaybeServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/maybe/textJson";
+
+ final Flowable<HelloWorldBean> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+ .assertComplete();
+ }
+
+ @Test
+ public void testGetString() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/maybe/textAsync";
+
+ final Flowable<String> obs = ClientBuilder
+ .newClient()
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.TEXT_PLAIN)
+ .rx(FlowableRxInvoker.class)
+ .get(String.class);
+
+ final TestSubscriber<String> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> "Hello, world!".equals(r))
+ .assertComplete();
+ }
+
+ @Test
+ public void testGetError() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/maybe/error";
+
+ final Flowable<String> obs = ClientBuilder
+ .newClient()
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get(String.class);
+
+ final TestSubscriber<String> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber.assertError(InternalServerErrorException.class);
+ }
+
+ @Test
+ public void testGetHelloWorldEmpty() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/maybe/empty";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> !r.hasEntity())
+ .assertComplete();
+ }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java
new file mode 100644
index 0000000..1c4c95e
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava3MaybeTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJava3MaybeServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly", launchServer(RxJava3MaybeServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx3/maybe/textJson";
+
+ final Flowable<HelloWorldBean> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+ .assertComplete();
+ }
+
+ @Test
+ public void testGetString() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx3/maybe/textAsync";
+
+ final Flowable<String> obs = ClientBuilder
+ .newClient()
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.TEXT_PLAIN)
+ .rx(FlowableRxInvoker.class)
+ .get(String.class);
+
+ final TestSubscriber<String> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> "Hello, world!".equals(r))
+ .assertComplete();
+ }
+
+ @Test
+ public void testGetError() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx3/maybe/error";
+
+ final Flowable<String> obs = ClientBuilder
+ .newClient()
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get(String.class);
+
+ final TestSubscriber<String> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber.assertError(InternalServerErrorException.class);
+ }
+
+ @Test
+ public void testGetHelloWorldEmpty() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx3/maybe/empty";
+
+ final Flowable<Response> obs = ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new FlowableRxInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(FlowableRxInvoker.class)
+ .get();
+
+ final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber
+ .assertValue(r -> !r.hasEntity())
+ .assertComplete();
+ }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java
new file mode 100644
index 0000000..58a79a2
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2MaybeServer extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(RxJava2MaybeServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+ public RxJava2MaybeServer() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setProvider(new JacksonJsonProvider());
+ new ReactiveIOCustomizer().customize(sf);
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJava2MaybeService.class);
+ sf.setResourceProvider(RxJava2MaybeService.class,
+ new SingletonResourceProvider(new RxJava2MaybeService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJava2MaybeServer s = new RxJava2MaybeServer();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
similarity index 63%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
index 300def8..71e37b8 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.cxf.systest.jaxrs.reactor;
+package org.apache.cxf.systest.jaxrs.reactive;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -27,47 +27,52 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
-import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean;
-@Path("/mono")
-public class MonoService {
+import io.reactivex.Maybe;
- @GET
- @Produces("application/json")
- @Path("textJson")
- public Mono<HelloWorldBean> getJson() {
- return Mono.just(new HelloWorldBean());
- }
+@Path("/rx2/maybe")
+public class RxJava2MaybeService {
@GET
@Produces("application/json")
- @Path("textJsonImplicitListAsyncStream")
- public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
- Mono.just("Hello")
- .map(HelloWorldBean::new)
- .subscribeOn(Schedulers.elastic())
- .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000L, 0L));
+ @Path("textJson")
+ public Maybe<HelloWorldBean> getJson() {
+ return Maybe.just(new HelloWorldBean());
}
@GET
@Produces("text/plain")
@Path("textAsync")
public void getTextAsync(@Suspended final AsyncResponse ar) {
- Mono.just("Hello, ").map(s -> s + "world!")
- .subscribe(new StringAsyncSubscriber(ar));
+ final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+
+ Maybe
+ .just("Hello, ")
+ .map(s -> s + "world!")
+ .subscribe(
+ s -> {
+ subscriber.onNext(s);
+ subscriber.onComplete();
+ },
+ subscriber::onError);
}
@GET
+ @Produces("application/json")
+ @Path("error")
+ public Maybe<HelloWorldBean> getError() {
+ return Maybe.error(new RuntimeException("Oops"));
+ }
+
+ @GET
@Produces(MediaType.APPLICATION_JSON)
- @Path("/empty")
- public Mono<HelloWorldBean> empty() {
- return Mono.empty();
+ @Path("empty")
+ public Maybe<HelloWorldBean> empty() {
+ return Maybe.empty();
}
-
private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
StringAsyncSubscriber(AsyncResponse ar) {
super(ar);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java
new file mode 100644
index 0000000..550f989
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava3MaybeServer extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(RxJava3MaybeServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+ public RxJava3MaybeServer() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setProvider(new JacksonJsonProvider());
+ new ReactiveIOCustomizer().customize(sf);
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJava3MaybeService.class);
+ sf.setResourceProvider(RxJava3MaybeService.class,
+ new SingletonResourceProvider(new RxJava3MaybeService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJava3MaybeServer s = new RxJava3MaybeServer();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
similarity index 63%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
index 300def8..4673034 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.cxf.systest.jaxrs.reactor;
+package org.apache.cxf.systest.jaxrs.reactive;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -27,47 +27,52 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
-import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean;
-@Path("/mono")
-public class MonoService {
+import io.reactivex.rxjava3.core.Maybe;
- @GET
- @Produces("application/json")
- @Path("textJson")
- public Mono<HelloWorldBean> getJson() {
- return Mono.just(new HelloWorldBean());
- }
+@Path("/rx3/maybe")
+public class RxJava3MaybeService {
@GET
@Produces("application/json")
- @Path("textJsonImplicitListAsyncStream")
- public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
- Mono.just("Hello")
- .map(HelloWorldBean::new)
- .subscribeOn(Schedulers.elastic())
- .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000L, 0L));
+ @Path("textJson")
+ public Maybe<HelloWorldBean> getJson() {
+ return Maybe.just(new HelloWorldBean());
}
@GET
@Produces("text/plain")
@Path("textAsync")
public void getTextAsync(@Suspended final AsyncResponse ar) {
- Mono.just("Hello, ").map(s -> s + "world!")
- .subscribe(new StringAsyncSubscriber(ar));
+ final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+
+ Maybe
+ .just("Hello, ")
+ .map(s -> s + "world!")
+ .subscribe(
+ s -> {
+ subscriber.onNext(s);
+ subscriber.onComplete();
+ },
+ subscriber::onError);
}
@GET
+ @Produces("application/json")
+ @Path("error")
+ public Maybe<HelloWorldBean> getError() {
+ return Maybe.error(new RuntimeException("Oops"));
+ }
+
+ @GET
@Produces(MediaType.APPLICATION_JSON)
- @Path("/empty")
- public Mono<HelloWorldBean> empty() {
- return Mono.empty();
+ @Path("empty")
+ public Maybe<HelloWorldBean> empty() {
+ return Maybe.empty();
}
-
private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
StringAsyncSubscriber(AsyncResponse ar) {
super(ar);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
index 18054e3..bacac30 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -19,6 +19,7 @@
package org.apache.cxf.systest.jaxrs.reactor;
+import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
@@ -112,4 +113,21 @@ public class MonoReactorTest extends AbstractBusClientServerTestBase {
.expectComplete()
.verify();
}
+
+ @Test
+ public void testGetError() throws Exception {
+ String address = "http://localhost:" + PORT + "/reactor/mono/error";
+
+ StepVerifier
+ .create(ClientBuilder
+ .newClient()
+ .register(new JacksonJsonProvider())
+ .register(new ReactorInvokerProvider())
+ .target(address)
+ .request(MediaType.APPLICATION_JSON)
+ .rx(ReactorInvoker.class)
+ .get(HelloWorldBean.class))
+ .expectErrorMatches(ex -> ex.getCause() instanceof InternalServerErrorException)
+ .verify();
+ }
}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
index 300def8..658449b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -57,7 +57,6 @@ public class MonoService {
public void getTextAsync(@Suspended final AsyncResponse ar) {
Mono.just("Hello, ").map(s -> s + "world!")
.subscribe(new StringAsyncSubscriber(ar));
-
}
@GET
@@ -67,6 +66,12 @@ public class MonoService {
return Mono.empty();
}
+ @GET
+ @Produces("application/json")
+ @Path("error")
+ public Mono<HelloWorldBean> getError() {
+ return Mono.error(new RuntimeException("Oops"));
+ }
private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
StringAsyncSubscriber(AsyncResponse ar) {