You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@cxf.apache.org by KJQ <kj...@logicdrop.com> on 2020/01/21 00:12:47 UTC

Consume a Flux using Project Reactor and JAX-RS

*I can return Mono or Flux fine from CXF JAX-RS endpoints but can I consume a
Reactor type (Flux) as well?*  I am running with CXF using your reactor
extensions inside SpringBoot and embedded Undertow (latest versions of
everything including CXF).

Something along the lines of this below?  If this is completely off-base
just let me know, this is the first time I am trying to consume a reactive
type and not just return it.

If this has something to do with mixing SpringBoot, CXF, and Reactor I think
I can get around it by taking in the data as I would of restfully then
publishing that internally to a consumer as a flux.

    @POST
    @Path("/import")
    @Produces(APPLICATION_JSON)
    @Consumes(APPLICATION_JSON)
    //@Consumes("application/stream+json")
    public Flux<Integer> importData(Flux<Map&lt;String, Object>> request)
    {
        final AtomicInteger i = new AtomicInteger(0);

        return request
            .map(r -> {
                return Integer.valueOf(i.getAndIncrement());
            });
    }

*Right now, it throws this exception:*

 at
com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
        at
com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1589)
        at
com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1055)
        at
com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserialize(AbstractDeserializer.java:265)
        at
com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1682)
        at
com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:977)
        at
com.fasterxml.jackson.jaxrs.base.ProviderBase.readFrom(ProviderBase.java:814)
        at
org.apache.cxf.jaxrs.utils.JAXRSUtils.readFromMessageBodyReader(JAXRSUtils.java:1429)
        at
org.apache.cxf.jaxrs.utils.JAXRSUtils.readFromMessageBody(JAXRSUtils.java:1381)
        at
org.apache.cxf.jaxrs.utils.JAXRSUtils.processRequestBodyParameter(JAXRSUtils.java:896)
        at
org.apache.cxf.jaxrs.utils.JAXRSUtils.processParameters(JAXRSUtils.java:832)
        at
org.apache.cxf.jaxrs.interceptor.JAXRSInInterceptor.processRequest(JAXRSInInterceptor.java:214)
        at
org.apache.cxf.jaxrs.interceptor.JAXRSInInterceptor.handleMessage(JAXRSInInterceptor.java:78)




--
Sent from: http://cxf.547215.n5.nabble.com/cxf-user-f547216.html