You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/10/23 22:03:47 UTC

[cxf] branch 3.3.x-fixes updated (59f52b3 -> caf65df)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a change to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git.


    from 59f52b3  Update Reactor to 3.2.21.RELEASE
     new ec0ad05  CXF-8355: RxJava3: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
     new caf65df  Recording .gitmergeinfo Changes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitmergeinfo                                      |   2 +
 .../cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java |   2 +
 .../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java    |  17 +-
 .../IllegalArgumentExceptionMapper.java            |   2 +-
 .../IllegalStateExceptionMapper.java               |   2 +-
 .../jaxrs/reactive/JAXRSRxJava3FlowableTest.java   | 183 +++++++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava3ObservableTest.java |  70 ++++++++
 .../jaxrs/reactive/JAXRSRxJava3SingleTest.java     | 113 +++++++++++++
 .../jaxrs/reactive/RxJava3FlowableServer.java      |   2 +
 .../jaxrs/reactive/RxJava3FlowableService.java     |  75 +++++++++
 .../jaxrs/reactive/RxJava3ObservableServer.java    |   5 +-
 .../jaxrs/reactive/RxJava3ObservableService.java   |  25 +++
 ...ervableServer.java => RxJava3SingleServer.java} |  18 +-
 .../RxJava3SingleService.java}                     |  48 +++---
 14 files changed, 523 insertions(+), 41 deletions(-)
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/{reactor => reactive}/IllegalArgumentExceptionMapper.java (96%)
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/{reactor => reactive}/IllegalStateExceptionMapper.java (96%)
 create mode 100644 systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/{RxJavaObservableServer.java => RxJava3SingleServer.java} (80%)
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/{reactor/MonoService.java => reactive/RxJava3SingleService.java} (60%)


[cxf] 01/02: CXF-8355: RxJava3: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit ec0ad05e9720d8b4b893fc4ee2ca8ca1ed30cf7d
Author: reta <dr...@gmail.com>
AuthorDate: Sat Oct 10 18:22:38 2020 -0400

    CXF-8355: RxJava3: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty
    
    (cherry picked from commit 5fded377430dcfd461cf8714ec893cbf18f606a4)
---
 .../cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java |   2 +
 .../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java    |  17 +-
 .../reactive/IllegalArgumentExceptionMapper.java   |  22 +--
 .../reactive/IllegalStateExceptionMapper.java      |  25 ++-
 .../jaxrs/reactive/JAXRSRxJava3FlowableTest.java   | 183 +++++++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava3ObservableTest.java |  70 ++++++++
 .../jaxrs/reactive/JAXRSRxJava3SingleTest.java     | 113 +++++++++++++
 .../jaxrs/reactive/RxJava3FlowableServer.java      |   2 +
 .../jaxrs/reactive/RxJava3FlowableService.java     |  75 +++++++++
 .../jaxrs/reactive/RxJava3ObservableServer.java    |   5 +-
 .../jaxrs/reactive/RxJava3ObservableService.java   |  25 +++
 ...ervableServer.java => RxJava3SingleServer.java} |  18 +-
 ...vableService.java => RxJava3SingleService.java} |  60 ++++---
 13 files changed, 555 insertions(+), 62 deletions(-)

diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
index bf07eec..592b9e6 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
+++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.rx3.server;
 
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.jaxrs.reactivestreams.server.ResponseStatusOnlyExceptionMapper;
 import org.apache.cxf.service.invoker.Invoker;
 
 public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
@@ -31,6 +32,7 @@ public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
         if (useStreamingSubscriber != null) {
             invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
         }
+        bean.setProvider(new ResponseStatusOnlyExceptionMapper());
         return invoker;
     }
 }
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
index 6739092..4d6be4f 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cxf.jaxrs.rx3.server;
 
+import java.util.Collections;
+
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
@@ -51,7 +53,7 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
-            Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+            Disposable d = subscribe(f, asyncResponse);
             if (d == null) {
                 throw new IllegalStateException("Subscribe did not return a Disposable");
             }
@@ -61,11 +63,22 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     
     protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-        Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+        Disposable d = subscribe(obs, asyncResponse);
         if (d == null) {
             throw new IllegalStateException("Subscribe did not return a Disposable");
         }
         return asyncResponse;
     }
 
+    private <T> Disposable subscribe(final Flowable<T> f, final AsyncResponseImpl asyncResponse) {
+        return f
+            .switchIfEmpty(Flowable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
+
+    private <T> Disposable subscribe(final Observable<T> obs, final AsyncResponseImpl asyncResponse) {
+        return obs
+            .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList())))
+            .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
+    }
 }
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java
similarity index 53%
copy from rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java
index bf07eec..a1dc412 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java
@@ -16,21 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.rx3.server;
 
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
-import org.apache.cxf.service.invoker.Invoker;
+package org.apache.cxf.systest.jaxrs.reactive;
 
-public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class IllegalArgumentExceptionMapper implements ExceptionMapper<IllegalArgumentException> {
     @Override
-    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
-        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
-                .getOrDefault("useStreamingSubscriber", null);
-        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
-        if (useStreamingSubscriber != null) {
-            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        }
-        return invoker;
+    public Response toResponse(IllegalArgumentException exception) {
+        return Response.status(400).build();
     }
 }
diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java
similarity index 53%
copy from rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java
index bf07eec..95ae19b 100644
--- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.rx3.server;
 
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
-import org.apache.cxf.service.invoker.Invoker;
+package org.apache.cxf.systest.jaxrs.reactive;
 
-public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class IllegalStateExceptionMapper implements ExceptionMapper<IllegalStateException> {
     @Override
-    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
-        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
-                .getOrDefault("useStreamingSubscriber", null);
-        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
-        if (useStreamingSubscriber != null) {
-            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        }
-        return invoker;
+    public Response toResponse(IllegalStateException exception) {
+        return Response
+            .status(409)
+            .entity(exception)
+            .build();
     }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
index 9df86bc..980129e 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
@@ -24,10 +24,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Invocation;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
@@ -64,6 +67,186 @@ public class JAXRSRxJava3FlowableTest extends AbstractBusClientServerTestBase {
     }
 
     @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/flowable/empty";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetHelloWorldEmpty2() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/empty";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/immediate/errors";
+        
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+    
+    @Test
+    public void testFlowableErrorWithExceptionMapperReturnsNoExceptionPayload() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/mixed/error";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+            
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should not include the exception payload (injected by exception mapper)
+        // if some elements have been emitted before
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && !r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableErrorWithExceptionMapperReturnsContentPayload() throws Exception {
+        GenericType<List<HelloWorldBean>> helloWorldBeanListType = new GenericType<List<HelloWorldBean>>() {  };
+        String address = "http://localhost:" + PORT + "/rx33/flowable/mixed/error";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should include the emitted elements prior the error
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(helloWorldBeanListType).size() == 4)
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableErrorsResponseWithMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/mapper/errors";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 400)
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableErrorWithWebException() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/web/errors";
+        
+        final Flowable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        // The response should not include the exception payload (injected by exception mapper)
+        // if some elements have been emitted before
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 403 && !r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testFlowableImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx33/flowable/immediate/mapper/errors";
+        
+        final Flowable<Response> obs = ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new FlowableRxInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(FlowableRxInvoker.class)
+                .get();
+        
+        final TestSubscriber<Response> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+    
+    @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:" + PORT + "/rx3/flowable/textJson";
         List<Object> providers = new LinkedList<>();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
index 6b66577..f1eb9f0 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
@@ -22,8 +22,13 @@ package org.apache.cxf.systest.jaxrs.reactive;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.xml.ws.Holder;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -36,6 +41,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import io.reactivex.rxjava3.core.Observable;
 import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.observers.TestObserver;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -88,7 +94,71 @@ public class JAXRSRxJava3ObservableTest extends AbstractBusClientServerTestBase
         String address = "http://localhost:" + PORT + "/rx3/observable/textJsonList";
         doTestGetHelloWorldJsonList(address);
     }
+    
+    @Test
+    public void testGetHelloWorldEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/observable/empty";
+        
+        final Observable<Response> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ObservableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ObservableRxInvoker.class)
+            .get();
+        
+        final TestObserver<Response> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
 
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "[]".equals(r.readEntity(String.class)))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testObservableImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/observable/immediate/errors";
+        
+        final Observable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new ObservableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(ObservableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestObserver<HelloWorldBean> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+    
+    @Test
+    public void testObservableImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/observable/immediate/mapper/errors";
+        
+        final Observable<Response> obs = ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ObservableRxInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ObservableRxInvoker.class)
+                .get();
+        
+        final TestObserver<Response> subscriber = new TestObserver<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .assertComplete();
+    }
+    
     private void doTestGetHelloWorldJsonList(String address) throws Exception {
         WebClient wc = WebClient.create(address,
                                         Collections.singletonList(new JacksonJsonProvider()));
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java
new file mode 100644
index 0000000..563cfcd
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JAXRSRxJava3SingleTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava3SingleServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(RxJava3SingleServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/single/textJson";
+
+        final Flowable<HelloWorldBean> obs = ClientBuilder
+            .newClient()
+            .register(new JacksonJsonProvider())
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .assertComplete();
+    }
+
+    @Test
+    public void testGetString() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/single/textAsync";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.TEXT_PLAIN)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber
+            .assertValue(r -> "Hello, world!".equals(r))
+            .assertComplete();
+    }
+    
+    @Test
+    public void testGetError() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx3/single/error";
+        
+        final Flowable<String> obs = ClientBuilder
+            .newClient()
+            .register(new FlowableRxInvokerProvider())
+            .target(address)
+            .request(MediaType.APPLICATION_JSON)
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+        
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertError(InternalServerErrorException.class);
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
index f78d918..109c2a2 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
@@ -51,6 +51,8 @@ public class RxJava3FlowableServer extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber);
         sf.setProvider(new JacksonJsonProvider());
+        sf.setProvider(new IllegalArgumentExceptionMapper());
+        sf.setProvider(new IllegalStateExceptionMapper());
         new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava3FlowableService.class);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
index af67f50..618d3e4 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
@@ -24,14 +24,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import javax.ws.rs.ForbiddenException;
 import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
 import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean;
 
 import io.reactivex.rxjava3.core.BackpressureStrategy;
 import io.reactivex.rxjava3.core.Flowable;
@@ -51,6 +55,77 @@ public class RxJava3FlowableService {
     }
     
     @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Flowable<HelloWorldBean> empty() { 
+        return Flowable.empty(); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/mapper/errors")
+    public Flowable<HelloWorldBean> mapperErrors() { 
+        return Flowable 
+            .range(1, 3) 
+            .flatMap(item -> { 
+                if (item < 3) { 
+                    return Flowable.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Flowable.error(new IllegalArgumentException("Oops")); 
+                } 
+            }); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/web/errors")
+    public Flowable<HelloWorldBean> webErrors() { 
+        return Flowable 
+            .range(1, 3) 
+            .concatMap(item -> { 
+                if (item < 3) { 
+                    return Flowable.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Flowable.error(new ForbiddenException("Oops")); 
+                } 
+            }); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Flowable<HelloWorldBean> immediateErrors() { 
+        return Flowable 
+            .range(1, 2) 
+            .flatMap(item -> Flowable.error(new RuntimeException("Oops"))); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Flowable<HelloWorldBean> immediateMapperErrors() { 
+        return Flowable 
+            .range(1, 2) 
+            .flatMap(item -> Flowable.error(new IllegalStateException("Oops"))); 
+    }
+    
+    @GET
+    @Path("/mixed/error")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Flowable<HelloWorldBean> errorAndData() {
+        return Flowable
+            .range(1, 5)
+            .flatMap(item -> {
+                if (item <= 4) {
+                    return Flowable.just(new HelloWorldBean(" of Item: " + item));
+                } else {
+                    return Flowable.error(new NotFoundException("Item not found"));
+                }
+            })
+            .onErrorResumeNext(e -> Flowable.error(new IllegalStateException("Oops", e)));
+    }
+    
+    @GET
     @Produces("application/json")
     @Path("textJsonImplicitListAsync")
     public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
index 0a8fcaa..6365bd8 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
@@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx3.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -42,8 +42,9 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
+        sf.setProvider(new IllegalStateExceptionMapper());
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava3ObservableService.class);
         sf.setResourceProvider(RxJava3ObservableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
index 51d362c..7393d40 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
@@ -25,6 +25,7 @@ import java.util.List;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
 
 import io.reactivex.rxjava3.core.Observable;
 
@@ -55,4 +56,28 @@ public class RxJava3ObservableService {
         return Observable.just(Arrays.asList(bean1, bean2));
     }
   
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Observable<HelloWorldBean> empty() { 
+        return Observable.empty(); 
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Observable<HelloWorldBean> immediateErrors() { 
+        return Observable 
+            .range(1, 2) 
+            .flatMap(item -> Observable.error(new RuntimeException("Oops"))); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Observable<HelloWorldBean> immediateMapperErrors() { 
+        return Observable 
+            .range(1, 2) 
+            .flatMap(item -> Observable.error(new IllegalStateException("Oops"))); 
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java
similarity index 80%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java
index 0a8fcaa..4b91c33b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java
@@ -26,15 +26,15 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx3.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
-public class RxJava3ObservableServer extends AbstractBusTestServerBase {
-    public static final String PORT = allocatePort(RxJava3ObservableServer.class);
+public class RxJava3SingleServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava3SingleServer.class);
 
     org.apache.cxf.endpoint.Server server;
-    public RxJava3ObservableServer() {
+    public RxJava3SingleServer() {
     }
 
     protected void run() {
@@ -42,12 +42,12 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
-        sf.setResourceClasses(RxJava3ObservableService.class);
-        sf.setResourceProvider(RxJava3ObservableService.class,
-                               new SingletonResourceProvider(new RxJava3ObservableService(), true));
+        sf.setResourceClasses(RxJava3SingleService.class);
+        sf.setResourceProvider(RxJava3SingleService.class,
+                               new SingletonResourceProvider(new RxJava3SingleService(), true));
         sf.setAddress("http://localhost:" + PORT + "/");
         server = sf.create();
     }
@@ -60,7 +60,7 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase {
 
     public static void main(String[] args) {
         try {
-            RxJava3ObservableServer s = new RxJava3ObservableServer();
+            RxJava3SingleServer s = new RxJava3SingleServer();
             s.start();
         } catch (Exception ex) {
             ex.printStackTrace();
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java
similarity index 50%
copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java
index 51d362c..9fc0797 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java
@@ -19,40 +19,54 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
-import java.util.Arrays;
-import java.util.List;
-
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 
-import io.reactivex.rxjava3.core.Observable;
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
 
-@Path("/rx3/observable")
-public class RxJava3ObservableService {
+import io.reactivex.rxjava3.core.Single;
+
+@Path("/rx3/single")
+public class RxJava3SingleService {
 
-    @GET
-    @Produces("text/plain")
-    @Path("text")
-    public Observable<String> getText() {
-        return Observable.just("Hello, world!");
-    }
-    
     @GET
     @Produces("application/json")
     @Path("textJson")
-    public Observable<HelloWorldBean> getJson() {
-        return Observable.just(new HelloWorldBean());
+    public Single<HelloWorldBean> getJson() {
+        return Single.just(new HelloWorldBean());
+    }
+
+    @GET
+    @Produces("text/plain")
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar);
+        
+        Single
+            .just("Hello, ")
+            .map(s -> s + "world!")
+            .subscribe(
+                s -> {
+                    subscriber.onNext(s);
+                    subscriber.onComplete();
+                },
+                subscriber::onError);
+
     }
     
     @GET
     @Produces("application/json")
-    @Path("textJsonList")
-    public Observable<List<HelloWorldBean>> getJsonList() {
-        HelloWorldBean bean1 = new HelloWorldBean();
-        HelloWorldBean bean2 = new HelloWorldBean();
-        bean2.setGreeting("Ciao");
-        return Observable.just(Arrays.asList(bean1, bean2));
+    @Path("error")
+    public Single<HelloWorldBean> getError() {
+        return Single.error(new RuntimeException("Oops"));
+    }
+
+    private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
     }
-  
-}
+}
\ No newline at end of file


[cxf] 02/02: Recording .gitmergeinfo Changes

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit caf65df4a67b5216ae8010c029780fcf7b2185c9
Author: reta <dr...@gmail.com>
AuthorDate: Fri Oct 23 17:54:53 2020 -0400

    Recording .gitmergeinfo Changes
---
 .gitmergeinfo | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/.gitmergeinfo b/.gitmergeinfo
index 9883ebc..7b4bb15 100644
--- a/.gitmergeinfo
+++ b/.gitmergeinfo
@@ -391,6 +391,8 @@ B ff2b7c87de12d16cd71a62e7a1f0a0be3901b39e
 B ff7b028eeea39fedeb06207753fe6dcb9b0d8cca
 M 00c77f3e98985d339b4bb3488a7ea8799eefcdec
 M 02de4c435f4de72fc93291bebd953dc11d53c02d
+M 043794e29cc5ec5435787e266d7dad6ff762018e
+M 04fe61b2a9931d23eaad336192fe2c3dc429c2aa
 M 09ddfdeaef6f32537dba23fa6d7ef36992b3217b
 M 10251d33f0f379a5bebc640466b639382cbe42bb
 M 113e426f6b717925736433fe04a1b72f801dd574