You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2018/10/02 21:04:04 UTC

[cxf] branch master updated: CXF-7854: Refactor RxJava2 Flowable and Observable Rx Invokers to not… (#451)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new a5e24a8  CXF-7854: Refactor RxJava2 Flowable and Observable Rx Invokers to not… (#451)
a5e24a8 is described below

commit a5e24a843430fe1f41823601ca1600ead8493017
Author: John Koehler <jk...@users.noreply.github.com>
AuthorDate: Tue Oct 2 16:03:42 2018 -0500

    CXF-7854: Refactor RxJava2 Flowable and Observable Rx Invokers to not… (#451)
    
    * CXF-7854: Refactor RxJava2 Flowable and Observable Rx Invokers to not use internal APIs
    
    * CXF-7854: Review comment - create the Flowable just once
    
    * CXF-7854: Review comment - use supplier to simplify the code
---
 .../jaxrs/rx2/client/FlowableRxInvokerImpl.java    | 60 ++++++++++++++--------
 .../rx2/client/FlowableRxInvokerProvider.java      |  7 +--
 .../jaxrs/rx2/client/ObservableRxInvokerImpl.java  | 58 +++++++++++++--------
 .../rx2/client/ObservableRxInvokerProvider.java    |  7 +--
 .../jaxrs/reactive/JAXRSRxJava2ObservableTest.java | 31 +++++++++++
 .../jaxrs/reactive/RxJava2ObservableService.java   | 22 ++++++--
 6 files changed, 130 insertions(+), 55 deletions(-)

diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
index be5c2d5..f34ba56 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
@@ -19,24 +19,28 @@
 package org.apache.cxf.jaxrs.rx2.client;
 
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
-import org.apache.cxf.jaxrs.client.WebClient;
-
+import io.reactivex.BackpressureStrategy;
 import io.reactivex.Flowable;
+import io.reactivex.FlowableEmitter;
+import io.reactivex.FlowableOnSubscribe;
 import io.reactivex.Scheduler;
 import io.reactivex.schedulers.Schedulers;
 
 
 public class FlowableRxInvokerImpl implements FlowableRxInvoker {
     private Scheduler sc;
-    private WebClient wc;
-    public FlowableRxInvokerImpl(WebClient wc, ExecutorService ex) {
-        this.wc = wc;
+    private SyncInvoker syncInvoker;
+    
+    public FlowableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService ex) {
+        this.syncInvoker = syncInvoker;
         this.sc = ex == null ? null : Schedulers.from(ex);
     }
 
@@ -147,34 +151,50 @@ public class FlowableRxInvokerImpl implements FlowableRxInvoker {
 
     @Override
     public <T> Flowable<T> method(String name, Entity<?> entity, Class<T> responseType) {
-        if (sc == null) {
-            return Flowable.fromFuture(wc.async().method(name, entity, responseType));
-        }
-        return Flowable.fromFuture(wc.async().method(name, entity, responseType), sc);
+        return create(() -> syncInvoker.method(name, entity, responseType));
     }
-
+    
     @Override
     public <T> Flowable<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
-        if (sc == null) {
-            return Flowable.fromFuture(wc.async().method(name, entity, responseType));
-        }
-        return Flowable.fromFuture(wc.async().method(name, entity, responseType), sc);
+        return create(() -> syncInvoker.method(name, entity, responseType));
     }
 
     @Override
     public <T> Flowable<T> method(String name, Class<T> responseType) {
-        if (sc == null) {
-            return Flowable.fromFuture(wc.async().method(name, responseType));
-        }
-        return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+        return create(() -> syncInvoker.method(name, responseType));
     }
 
     @Override
     public <T> Flowable<T> method(String name, GenericType<T> responseType) {
+        return create(() -> syncInvoker.method(name, responseType));
+    }
+    
+    private <T> Flowable<T> create(Supplier<T> supplier) {
+        Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
+            @Override
+            public void subscribe(FlowableEmitter<T> emitter) throws Exception {
+                try {
+                    T response = supplier.get();
+                    if (!emitter.isCancelled()) {
+                        emitter.onNext(response);
+                    }
+                    
+                    if (!emitter.isCancelled()) {
+                        emitter.onComplete();
+                    }
+                } catch (Throwable e) {
+                    if (!emitter.isCancelled()) {
+                        emitter.onError(e);
+                    }
+                }
+            }
+        }, BackpressureStrategy.DROP);
+        
         if (sc == null) {
-            return Flowable.fromFuture(wc.async().method(name, responseType));
+            return flowable.subscribeOn(Schedulers.io());
         }
-        return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+        
+        return flowable.subscribeOn(sc).observeOn(sc);
     }
 
 }
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
index e4e0e71..6b61b51 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
@@ -24,17 +24,12 @@ import javax.ws.rs.client.RxInvokerProvider;
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.ext.Provider;
 
-import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
-
 @Provider
 public class FlowableRxInvokerProvider implements RxInvokerProvider<FlowableRxInvoker> {
 
     @Override
     public FlowableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
-        // TODO: At the moment we still delegate if possible to the async HTTP conduit.
-        // Investigate if letting the RxJava thread pool deal with the sync invocation
-        // is indeed more effective
-        return new FlowableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+        return new FlowableRxInvokerImpl(syncInvoker, executorService);
     }
 
     @Override
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
index 2c1f966..1cb1b9f 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
@@ -19,24 +19,26 @@
 package org.apache.cxf.jaxrs.rx2.client;
 
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
-import org.apache.cxf.jaxrs.client.WebClient;
-
 import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.ObservableOnSubscribe;
 import io.reactivex.Scheduler;
 import io.reactivex.schedulers.Schedulers;
 
 
 public class ObservableRxInvokerImpl implements ObservableRxInvoker {
     private Scheduler sc;
-    private WebClient wc;
-    public ObservableRxInvokerImpl(WebClient wc, ExecutorService ex) {
-        this.wc = wc;
+    private SyncInvoker syncInvoker;
+    public ObservableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService ex) {
+        this.syncInvoker = syncInvoker;
         this.sc = ex == null ? null : Schedulers.from(ex);
     }
 
@@ -147,34 +149,50 @@ public class ObservableRxInvokerImpl implements ObservableRxInvoker {
 
     @Override
     public <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType) {
-        if (sc == null) {
-            return Observable.fromFuture(wc.async().method(name, entity, responseType));
-        }
-        return Observable.fromFuture(wc.async().method(name, entity, responseType), sc);
+        return create(() -> syncInvoker.method(name, entity, responseType));
     }
-
+    
     @Override
     public <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
-        if (sc == null) {
-            return Observable.fromFuture(wc.async().method(name, entity, responseType));
-        }
-        return Observable.fromFuture(wc.async().method(name, entity, responseType), sc);
+        return create(() -> syncInvoker.method(name, entity, responseType));
     }
 
     @Override
     public <T> Observable<T> method(String name, Class<T> responseType) {
-        if (sc == null) {
-            return Observable.fromFuture(wc.async().method(name, responseType));
-        }
-        return Observable.fromFuture(wc.async().method(name, responseType), sc);
+        return create(() -> syncInvoker.method(name, responseType));
     }
 
     @Override
     public <T> Observable<T> method(String name, GenericType<T> responseType) {
+        return create(() -> syncInvoker.method(name, responseType));
+    }
+    
+    private <T> Observable<T> create(Supplier<T> supplier) {
+        Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
+            @Override
+            public void subscribe(ObservableEmitter<T> emitter) throws Exception {
+                try {
+                    T response = supplier.get();
+                    if (!emitter.isDisposed()) {
+                        emitter.onNext(response);
+                    }
+                    
+                    if (!emitter.isDisposed()) {
+                        emitter.onComplete();
+                    }
+                } catch (Throwable e) {
+                    if (!emitter.isDisposed()) {
+                        emitter.onError(e);
+                    }
+                }
+            }
+        });
+        
         if (sc == null) {
-            return Observable.fromFuture(wc.async().method(name, responseType));
+            return observable.subscribeOn(Schedulers.io());
         }
-        return Observable.fromFuture(wc.async().method(name, responseType), sc);
+        
+        return observable.subscribeOn(sc).observeOn(sc);
     }
 
 }
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
index 221bc48..5ab701c 100644
--- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
@@ -24,17 +24,12 @@ import javax.ws.rs.client.RxInvokerProvider;
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.ext.Provider;
 
-import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
-
 @Provider
 public class ObservableRxInvokerProvider implements RxInvokerProvider<ObservableRxInvoker> {
 
     @Override
     public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
-        // TODO: At the moment we still delegate if possible to the async HTTP conduit.
-        // Investigate if letting the RxJava thread pool deal with the sync invocation
-        // is indeed more effective
-        return new ObservableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+        return new ObservableRxInvokerImpl(syncInvoker, executorService);
     }
 
     @Override
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
index dc8ef13..081a518 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
@@ -19,9 +19,11 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
+import javax.ws.rs.core.GenericType;
 import javax.xml.ws.Holder;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -49,6 +51,14 @@ public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase
         createStaticBus();
     }
     @Test
+    public void testGetHelloWorldText() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/text";
+        WebClient wc = WebClient.create(address);
+        String text = wc.accept("text/plain").get(String.class);
+        assertEquals("Hello, world!", text);
+    }
+
+    @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:" + PORT + "/rx2/observable/textJson";
         List<Object> providers = new LinkedList<>();
@@ -70,4 +80,25 @@ public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase
         assertEquals("Hello", holder.value.getGreeting());
         assertEquals("World", holder.value.getAudience());
     }
+    
+    @Test
+    public void testGetHelloWorldJsonList() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/textJsonList";
+        doTestGetHelloWorldJsonList(address);
+    } 
+    
+    private void doTestGetHelloWorldJsonList(String address) throws Exception {
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new JacksonJsonProvider()));
+        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+        GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+        };
+
+        List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+        assertEquals(2, beans.size());
+        assertEquals("Hello", beans.get(0).getGreeting());
+        assertEquals("World", beans.get(0).getAudience());
+        assertEquals("Ciao", beans.get(1).getGreeting());
+        assertEquals("World", beans.get(1).getAudience());
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
index abb6e72..f08147d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -19,6 +19,8 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
+import java.util.Arrays;
+import java.util.List;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -26,10 +28,15 @@ import javax.ws.rs.Produces;
 
 import io.reactivex.Observable;
 
-
 @Path("/rx2/observable")
 public class RxJava2ObservableService {
 
+    @GET
+    @Produces("text/plain")
+    @Path("text")
+    public Observable<String> getText() {
+        return Observable.just("Hello, world!");
+    }
     
     @GET
     @Produces("application/json")
@@ -37,6 +44,15 @@ public class RxJava2ObservableService {
     public Observable<HelloWorldBean> getJson() {
         return Observable.just(new HelloWorldBean());
     }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJsonList")
+    public Observable<List<HelloWorldBean>> getJsonList() {
+        HelloWorldBean bean1 = new HelloWorldBean();
+        HelloWorldBean bean2 = new HelloWorldBean();
+        bean2.setGreeting("Ciao");
+        return Observable.just(Arrays.asList(bean1, bean2));
+    }
+  
 }
-
-