You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by James Carman <ja...@carmanconsulting.com> on 2018/02/23 15:39:01 UTC

RxJava2 and Spring Boot?

I'm trying to get the RxJava support working using CXF 3.2.2 and Spring
Boot (1.5.10.RELEASE). I am using the following CXF libraries explicitly
(including transitive dependencies):

cxf-spring-boot-starter-jaxrs
cxf-rt-rs-extension-reactivestreams
cxf-rt-rs-extension-rx
io.reactivex.rxjava2:rxjava:2.1.3

I am following (I think) the instructions here:

http://cxf.apache.org/docs/jax-rs-rxjava.html#JAX-RSRxJava-Introduction


I have created the following CXF feature for installing the
ReactiveIOInvoker:

@Provider(value = Provider.Type.Feature,scope = Provider.Scope.Server)
public class RxJavaFeature extends AbstractFeature {
  @Override
  public void initialize(Server server, Bus bus) {
    final ReactiveIOInvoker invoker = new ReactiveIOInvoker();
    invoker.setUseStreamingSubscriberIfPossible(true);
    server.getEndpoint().getService().setInvoker(invoker);
  }
}

On the client side (this uses the framework that I mentioned yesterday),
I'm installing a FlowableRxInvokerProvider and an
ObservableRxInvokerProvider. My response keeps showing up empty, even
though I'm returning a Flowable.just(<MyReturnObject>):


2018-02-23 10:32:07.547 INFO 89838 --- [o-auto-1-exec-1]
o.a.cxf.services.RxJavaHelloImpl.REQ_IN : REQ_IN
Address: http://localhost:64452/api/hello/RxJava
HttpMethod: GET
ExchangeId: 0bb9cac2-7d48-4368-bb30-3c75948b0669
Headers: {Accept=application/json, host=localhost:64452,
connection=keep-alive, cache-control=no-cache, pragma=no-cache,
user-agent=Apache-CXF/3.2.2}


2018-02-23 10:32:07.575 INFO 89838 --- [o-auto-1-exec-1]
o.a.c.services.RxJavaHelloImpl.RESP_OUT : RESP_OUT
Content-Type: application/json
ResponseCode: 200
ExchangeId: 0bb9cac2-7d48-4368-bb30-3c75948b0669
Headers: {Date=Fri, 23 Feb 2018 15:32:07 GMT, Content-Type=application/json}
Payload: {}

Did I miss a step?

Thanks,

James

Re: RxJava2 and Spring Boot?

Posted by Andriy Redko <dr...@gmail.com>.
Hi James,

Glad you figured it out. It would be great if you could share your hurdles 
with client-side later, is it something we could improve in the integration 
or it is just a matter of properly documenting things. Thanks.

Best Regards,
    Andriy Redko

JC> So, the bug ended up being on my part.  I was using a "try-with-resources" block in my JSON provider (custom
JC> GSON-based implementation).  Since it's try-with-resources, it automatically was closing the InputStreamReader, and
JC> thus the underlying InputStream (entityStream), which is a clear violation of the advice of the JavaDocs for
JC> MessageBodyReader.  Oops!  I have rectified that error and now everything is flowing just fine from the server. 
JC> However, I couldn't get the invoker stuff working on the client side since I am using client proxies.  I ended up
JC> writing a custom FlowableMessageBodyReader and SingleMessageBodyReader that seem to be doing the trick.  My only
JC> concern is that the underlying entity response stream will be closed after a period of time if the Flowable isn't
JC> consumed in a timely fashion.  I'm tinkering around with that scenario, but it doesn't seem to be happening just yet.  


JC> Thanks for the reply!


JC> James
JC> On Sat, Feb 24, 2018 at 8:47 AM Andriy Redko <dr...@gmail.com> wrote:

JC> Hi James,

JC>  Seems like everything is in place, should be working fine. Would you
JC>  mind please to share the the @GET implementation? Could it be that
JC>  your object serializes to empty json (ignore nullable props, etc)?
JC>  Thanks.

JC>  Best Regards,
JC>      Andriy Redko

JC>  Friday, February 23, 2018, 10:39:01 AM, you wrote:

 JC>> I'm trying to get the RxJava support working using CXF 3.2.2 and Spring
 JC>> Boot (1.5.10.RELEASE). I am using the following CXF libraries explicitly
 JC>> (including transitive dependencies):

 JC>> cxf-spring-boot-starter-jaxrs
 JC>> cxf-rt-rs-extension-reactivestreams
 JC>> cxf-rt-rs-extension-rx
 JC>> io.reactivex.rxjava2:rxjava:2.1.3

 JC>> I am following (I think) the instructions here:

 JC>> http://cxf.apache.org/docs/jax-rs-rxjava.html#JAX-RSRxJava-Introduction


 JC>> I have created the following CXF feature for installing the
 JC>> ReactiveIOInvoker:

 JC>> @Provider(value = Provider.Type.Feature,scope = Provider.Scope.Server)
 JC>> public class RxJavaFeature extends AbstractFeature {
 JC>>   @Override
 JC>>   public void initialize(Server server, Bus bus) {
 JC>>     final ReactiveIOInvoker invoker = new ReactiveIOInvoker();
 JC>>     invoker.setUseStreamingSubscriberIfPossible(true);
 JC>>     server.getEndpoint().getService().setInvoker(invoker);
 JC>>   }
 JC>> }

 JC>> On the client side (this uses the framework that I mentioned yesterday),
 JC>> I'm installing a FlowableRxInvokerProvider and an
 JC>> ObservableRxInvokerProvider. My response keeps showing up empty, even
 JC>> though I'm returning a Flowable.just(<MyReturnObject>):


 JC>> 2018-02-23 10:32:07.547 INFO 89838 --- [o-auto-1-exec-1]
 JC>> o.a.cxf.services.RxJavaHelloImpl.REQ_IN : REQ_IN
 JC>> Address: http://localhost:64452/api/hello/RxJava
 JC>> HttpMethod: GET
 JC>> ExchangeId: 0bb9cac2-7d48-4368-bb30-3c75948b0669
 JC>> Headers: {Accept=application/json, host=localhost:64452,
 JC>> connection=keep-alive, cache-control=no-cache, pragma=no-cache,
 JC>> user-agent=Apache-CXF/3.2.2}


 JC>> 2018-02-23 10:32:07.575 INFO 89838 --- [o-auto-1-exec-1]
 JC>> o.a.c.services.RxJavaHelloImpl.RESP_OUT : RESP_OUT
 JC>> Content-Type: application/json
 JC>> ResponseCode: 200
 JC>> ExchangeId: 0bb9cac2-7d48-4368-bb30-3c75948b0669
 JC>> Headers: {Date=Fri, 23 Feb 2018 15:32:07 GMT, Content-Type=application/json}
 JC>> Payload: {}

 JC>> Did I miss a step?

 JC>> Thanks,

 JC>> James




Re: RxJava2 and Spring Boot?

Posted by Andriy Redko <dr...@gmail.com>.
Hi James,

Seems like everything is in place, should be working fine. Would you 
mind please to share the the @GET implementation? Could it be that 
your object serializes to empty json (ignore nullable props, etc)? 
Thanks.

Best Regards,
    Andriy Redko

Friday, February 23, 2018, 10:39:01 AM, you wrote:

JC> I'm trying to get the RxJava support working using CXF 3.2.2 and Spring
JC> Boot (1.5.10.RELEASE). I am using the following CXF libraries explicitly
JC> (including transitive dependencies):

JC> cxf-spring-boot-starter-jaxrs
JC> cxf-rt-rs-extension-reactivestreams
JC> cxf-rt-rs-extension-rx
JC> io.reactivex.rxjava2:rxjava:2.1.3

JC> I am following (I think) the instructions here:

JC> http://cxf.apache.org/docs/jax-rs-rxjava.html#JAX-RSRxJava-Introduction


JC> I have created the following CXF feature for installing the
JC> ReactiveIOInvoker:

JC> @Provider(value = Provider.Type.Feature,scope = Provider.Scope.Server)
JC> public class RxJavaFeature extends AbstractFeature {
JC>   @Override
JC>   public void initialize(Server server, Bus bus) {
JC>     final ReactiveIOInvoker invoker = new ReactiveIOInvoker();
JC>     invoker.setUseStreamingSubscriberIfPossible(true);
JC>     server.getEndpoint().getService().setInvoker(invoker);
JC>   }
JC> }

JC> On the client side (this uses the framework that I mentioned yesterday),
JC> I'm installing a FlowableRxInvokerProvider and an
JC> ObservableRxInvokerProvider. My response keeps showing up empty, even
JC> though I'm returning a Flowable.just(<MyReturnObject>):


JC> 2018-02-23 10:32:07.547 INFO 89838 --- [o-auto-1-exec-1]
JC> o.a.cxf.services.RxJavaHelloImpl.REQ_IN : REQ_IN
JC> Address: http://localhost:64452/api/hello/RxJava
JC> HttpMethod: GET
JC> ExchangeId: 0bb9cac2-7d48-4368-bb30-3c75948b0669
JC> Headers: {Accept=application/json, host=localhost:64452,
JC> connection=keep-alive, cache-control=no-cache, pragma=no-cache,
JC> user-agent=Apache-CXF/3.2.2}


JC> 2018-02-23 10:32:07.575 INFO 89838 --- [o-auto-1-exec-1]
JC> o.a.c.services.RxJavaHelloImpl.RESP_OUT : RESP_OUT
JC> Content-Type: application/json
JC> ResponseCode: 200
JC> ExchangeId: 0bb9cac2-7d48-4368-bb30-3c75948b0669
JC> Headers: {Date=Fri, 23 Feb 2018 15:32:07 GMT, Content-Type=application/json}
JC> Payload: {}

JC> Did I miss a step?

JC> Thanks,

JC> James