You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2018/10/02 21:04:04 UTC
[cxf] branch master updated: CXF-7854: Refactor RxJava2 Flowable and Observable Rx Invokers to not… (#451)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new a5e24a8 CXF-7854: Refactor RxJava2 Flowable and Observable Rx Invokers to not… (#451)
a5e24a8 is described below
commit a5e24a843430fe1f41823601ca1600ead8493017
Author: John Koehler <jk...@users.noreply.github.com>
AuthorDate: Tue Oct 2 16:03:42 2018 -0500
CXF-7854: Refactor RxJava2 Flowable and Observable Rx Invokers to not… (#451)
* CXF-7854: Refactor RxJava2 Flowable and Observable Rx Invokers to not use internal APIs
* CXF-7854: Review comment - create the Flowable just once
* CXF-7854: Review comment - use supplier to simplify the code
---
.../jaxrs/rx2/client/FlowableRxInvokerImpl.java | 60 ++++++++++++++--------
.../rx2/client/FlowableRxInvokerProvider.java | 7 +--
.../jaxrs/rx2/client/ObservableRxInvokerImpl.java | 58 +++++++++++++--------
.../rx2/client/ObservableRxInvokerProvider.java | 7 +--
.../jaxrs/reactive/JAXRSRxJava2ObservableTest.java | 31 +++++++++++
.../jaxrs/reactive/RxJava2ObservableService.java | 22 ++++++--
6 files changed, 130 insertions(+), 55 deletions(-)
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
index be5c2d5..f34ba56 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
@@ -19,24 +19,28 @@
package org.apache.cxf.jaxrs.rx2.client;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
-import org.apache.cxf.jaxrs.client.WebClient;
-
+import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
+import io.reactivex.FlowableEmitter;
+import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
public class FlowableRxInvokerImpl implements FlowableRxInvoker {
private Scheduler sc;
- private WebClient wc;
- public FlowableRxInvokerImpl(WebClient wc, ExecutorService ex) {
- this.wc = wc;
+ private SyncInvoker syncInvoker;
+
+ public FlowableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService ex) {
+ this.syncInvoker = syncInvoker;
this.sc = ex == null ? null : Schedulers.from(ex);
}
@@ -147,34 +151,50 @@ public class FlowableRxInvokerImpl implements FlowableRxInvoker {
@Override
public <T> Flowable<T> method(String name, Entity<?> entity, Class<T> responseType) {
- if (sc == null) {
- return Flowable.fromFuture(wc.async().method(name, entity, responseType));
- }
- return Flowable.fromFuture(wc.async().method(name, entity, responseType), sc);
+ return create(() -> syncInvoker.method(name, entity, responseType));
}
-
+
@Override
public <T> Flowable<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
- if (sc == null) {
- return Flowable.fromFuture(wc.async().method(name, entity, responseType));
- }
- return Flowable.fromFuture(wc.async().method(name, entity, responseType), sc);
+ return create(() -> syncInvoker.method(name, entity, responseType));
}
@Override
public <T> Flowable<T> method(String name, Class<T> responseType) {
- if (sc == null) {
- return Flowable.fromFuture(wc.async().method(name, responseType));
- }
- return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+ return create(() -> syncInvoker.method(name, responseType));
}
@Override
public <T> Flowable<T> method(String name, GenericType<T> responseType) {
+ return create(() -> syncInvoker.method(name, responseType));
+ }
+
+ private <T> Flowable<T> create(Supplier<T> supplier) {
+ Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
+ @Override
+ public void subscribe(FlowableEmitter<T> emitter) throws Exception {
+ try {
+ T response = supplier.get();
+ if (!emitter.isCancelled()) {
+ emitter.onNext(response);
+ }
+
+ if (!emitter.isCancelled()) {
+ emitter.onComplete();
+ }
+ } catch (Throwable e) {
+ if (!emitter.isCancelled()) {
+ emitter.onError(e);
+ }
+ }
+ }
+ }, BackpressureStrategy.DROP);
+
if (sc == null) {
- return Flowable.fromFuture(wc.async().method(name, responseType));
+ return flowable.subscribeOn(Schedulers.io());
}
- return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+
+ return flowable.subscribeOn(sc).observeOn(sc);
}
}
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
index e4e0e71..6b61b51 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
@@ -24,17 +24,12 @@ import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.ext.Provider;
-import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
-
@Provider
public class FlowableRxInvokerProvider implements RxInvokerProvider<FlowableRxInvoker> {
@Override
public FlowableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
- // TODO: At the moment we still delegate if possible to the async HTTP conduit.
- // Investigate if letting the RxJava thread pool deal with the sync invocation
- // is indeed more effective
- return new FlowableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+ return new FlowableRxInvokerImpl(syncInvoker, executorService);
}
@Override
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
index 2c1f966..1cb1b9f 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
@@ -19,24 +19,26 @@
package org.apache.cxf.jaxrs.rx2.client;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
-import org.apache.cxf.jaxrs.client.WebClient;
-
import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
public class ObservableRxInvokerImpl implements ObservableRxInvoker {
private Scheduler sc;
- private WebClient wc;
- public ObservableRxInvokerImpl(WebClient wc, ExecutorService ex) {
- this.wc = wc;
+ private SyncInvoker syncInvoker;
+ public ObservableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService ex) {
+ this.syncInvoker = syncInvoker;
this.sc = ex == null ? null : Schedulers.from(ex);
}
@@ -147,34 +149,50 @@ public class ObservableRxInvokerImpl implements ObservableRxInvoker {
@Override
public <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType) {
- if (sc == null) {
- return Observable.fromFuture(wc.async().method(name, entity, responseType));
- }
- return Observable.fromFuture(wc.async().method(name, entity, responseType), sc);
+ return create(() -> syncInvoker.method(name, entity, responseType));
}
-
+
@Override
public <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
- if (sc == null) {
- return Observable.fromFuture(wc.async().method(name, entity, responseType));
- }
- return Observable.fromFuture(wc.async().method(name, entity, responseType), sc);
+ return create(() -> syncInvoker.method(name, entity, responseType));
}
@Override
public <T> Observable<T> method(String name, Class<T> responseType) {
- if (sc == null) {
- return Observable.fromFuture(wc.async().method(name, responseType));
- }
- return Observable.fromFuture(wc.async().method(name, responseType), sc);
+ return create(() -> syncInvoker.method(name, responseType));
}
@Override
public <T> Observable<T> method(String name, GenericType<T> responseType) {
+ return create(() -> syncInvoker.method(name, responseType));
+ }
+
+ private <T> Observable<T> create(Supplier<T> supplier) {
+ Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
+ @Override
+ public void subscribe(ObservableEmitter<T> emitter) throws Exception {
+ try {
+ T response = supplier.get();
+ if (!emitter.isDisposed()) {
+ emitter.onNext(response);
+ }
+
+ if (!emitter.isDisposed()) {
+ emitter.onComplete();
+ }
+ } catch (Throwable e) {
+ if (!emitter.isDisposed()) {
+ emitter.onError(e);
+ }
+ }
+ }
+ });
+
if (sc == null) {
- return Observable.fromFuture(wc.async().method(name, responseType));
+ return observable.subscribeOn(Schedulers.io());
}
- return Observable.fromFuture(wc.async().method(name, responseType), sc);
+
+ return observable.subscribeOn(sc).observeOn(sc);
}
}
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
index 221bc48..5ab701c 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
@@ -24,17 +24,12 @@ import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.ext.Provider;
-import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
-
@Provider
public class ObservableRxInvokerProvider implements RxInvokerProvider<ObservableRxInvoker> {
@Override
public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
- // TODO: At the moment we still delegate if possible to the async HTTP conduit.
- // Investigate if letting the RxJava thread pool deal with the sync invocation
- // is indeed more effective
- return new ObservableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+ return new ObservableRxInvokerImpl(syncInvoker, executorService);
}
@Override
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
index dc8ef13..081a518 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
@@ -19,9 +19,11 @@
package org.apache.cxf.systest.jaxrs.reactive;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import javax.ws.rs.core.GenericType;
import javax.xml.ws.Holder;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -49,6 +51,14 @@ public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase
createStaticBus();
}
@Test
+ public void testGetHelloWorldText() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/observable/text";
+ WebClient wc = WebClient.create(address);
+ String text = wc.accept("text/plain").get(String.class);
+ assertEquals("Hello, world!", text);
+ }
+
+ @Test
public void testGetHelloWorldJson() throws Exception {
String address = "http://localhost:" + PORT + "/rx2/observable/textJson";
List<Object> providers = new LinkedList<>();
@@ -70,4 +80,25 @@ public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase
assertEquals("Hello", holder.value.getGreeting());
assertEquals("World", holder.value.getAudience());
}
+
+ @Test
+ public void testGetHelloWorldJsonList() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/observable/textJsonList";
+ doTestGetHelloWorldJsonList(address);
+ }
+
+ private void doTestGetHelloWorldJsonList(String address) throws Exception {
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new JacksonJsonProvider()));
+ WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+ GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+ };
+
+ List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+ assertEquals(2, beans.size());
+ assertEquals("Hello", beans.get(0).getGreeting());
+ assertEquals("World", beans.get(0).getAudience());
+ assertEquals("Ciao", beans.get(1).getGreeting());
+ assertEquals("World", beans.get(1).getAudience());
+ }
}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
index abb6e72..f08147d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -19,6 +19,8 @@
package org.apache.cxf.systest.jaxrs.reactive;
+import java.util.Arrays;
+import java.util.List;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -26,10 +28,15 @@ import javax.ws.rs.Produces;
import io.reactivex.Observable;
-
@Path("/rx2/observable")
public class RxJava2ObservableService {
+ @GET
+ @Produces("text/plain")
+ @Path("text")
+ public Observable<String> getText() {
+ return Observable.just("Hello, world!");
+ }
@GET
@Produces("application/json")
@@ -37,6 +44,15 @@ public class RxJava2ObservableService {
public Observable<HelloWorldBean> getJson() {
return Observable.just(new HelloWorldBean());
}
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonList")
+ public Observable<List<HelloWorldBean>> getJsonList() {
+ HelloWorldBean bean1 = new HelloWorldBean();
+ HelloWorldBean bean2 = new HelloWorldBean();
+ bean2.setGreeting("Ciao");
+ return Observable.just(Arrays.asList(bean1, bean2));
+ }
+
}
-
-