You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Hrvoje Djurdjevic <hr...@gmail.com> on 2018/04/24 15:20:28 UTC

What route returns

Hi,

I notice that if I split body without referencing aggregation strategy,
like this: split().body()
route returns the message like it was before splitting.
However, if I reference aggregation strategy in a split call like
this: .split(body(),new AggrStrat())
then route returns the message after aggregation. However, if I want to do
further tasks in a route after aggregator, in which I reference the same
strategy as in a splitter,
such as for example unmarshalling the aggregated message from fixedlength
format and marshalling into json, although these tasks were properly done,
which I can verify by logging body, the REST route that is calling the
observed route still receives the aggregated fixedlength message format.
So, basically I see json printed in java console, but in postman or browser
I get fixedlength format.
Does that mean that I have to implement conversion from fixedlength to json
in aggregation strategy class, or there is still a way to do it
somehow in a route builder class?

The outline of my route is like this:

from(getDirectRouteId(id))
.routeId(id)
        .setBody(simple("original request message"))

.setHeader("CamelJmsDestinationName",constant("queue://QM_TEST/inputq?targetClient=1"))

.to(ExchangePattern.InOut,"websphere:queue:SYSTEM.DEFAULT.LOCAL.QUEUE?useMessageIDAsCorrelationID=true&replyTo=REPLYQ")
        .unmarshal(dataFormat)
        .split(body(),new AgrStrat()).parallelProcessing()
        .process(new Processor() {
            public void process(Exchange exchange) throws Exception {
            //assemble new mq requests based on the content of splitted
response mq message
           }
        })

.setHeader("CamelJmsDestinationName",constant("queue://QM_TEST/inputq?targetClient=1"))

.to(ExchangePattern.InOut,"websphere:queue:SYSTEM.DEFAULT.LOCAL.QUEUE?useMessageIDAsCorrelationID=true&replyTo=REPLYQ")
        .aggregate(constant(true),new AgrStrat()).completionTimeout(3000)
        .log("${body}")
        .unmarshal(dataFormat1)
        .marshal().json(JsonLibrary.Jackson)
        .log("${body}");

AgrStrat basically concatenates the newExchange with the oldExchange,
inserting "\n" in between, at the moment.
I guess I should move format conversion there? Thank you.

Re: What route returns

Posted by Hrvoje Djurdjevic <hr...@gmail.com>.
Hi Zoran,

I replaced the line:

.aggregate(constant(true),new AgrStrat()).completionTimeout(3000)

with:

.end()

and got the expected result. Thank you very much!
The route in short gets called by a rest route, it issues one mq request
based on parameters received from the rest request,
splits the response from the first mq response, and then reissues another
type of mq request per each splitted message,
and finally aggregates all responses and reformats the result to json.

Cheers and stay well,
Hrvoje

On Wed, Apr 25, 2018 at 2:11 PM, Zoran Regvart <zo...@regvart.com> wrote:

> Hi Hrvoje,
> there is an example[1] in the documentation that does something
> similar to what I think you want to do, also in the splitter docs[2]
> there is a note on what the splitter step returns.
>
> Here is a super simple example to illustrate this:
>
>     from("timer:tick?period=1s")
>         .setBody(constant("a b c"))
>         .split(body().tokenize(" "), AggregationStrategies.groupedBody())
>             .transform(simple("*${body}*"))
>         .end()
>         .marshal().json(JsonLibrary.Jackson)
>         .log("${body}");
>
> Here the body of `"a b c"` is split on the space token and aggregated
> by the splitter by grouping the body of each child processing into a
> java.util.List, processing is super simple it just converts `"x"` to
> `"*x*"`, next processor after splitter now gets a java.util.List of
> bodies from transform processor, in this example that processor would
> just marshal the java.util.List to JSON array, so the output should be
> `["*a*","*b*","*c*"]`.
>
> I'm not 100% sure I follow the logic of the route you provided, but it
> seems to me that the second JMS processor, i.e. the one followed by
> the `aggregate`, should be nested within the split step.
>
> Just remember that when you use the split step with
> AggregationStrategy it's the split step the one that aggregates the
> results. A while back Torsten Mielke wrote a really good explanation
> on the aggregator[3] that you might want to take a look at.
>
> zoran
>
> [1] https://github.com/apache/camel/blob/master/camel-core/
> src/main/docs/eips/split-eip.adoc#split-aggregate-requestreply-sample
> [2] https://github.com/apache/camel/blob/master/camel-core/
> src/main/docs/eips/split-eip.adoc#what-the-splitter-returns
> [3] https://tmielke.blogspot.de/2009/01/using-camel-
> aggregator-correctly.html
>
> On Tue, Apr 24, 2018 at 5:20 PM, Hrvoje Djurdjevic
> <hr...@gmail.com> wrote:
> > Hi,
> >
> > I notice that if I split body without referencing aggregation strategy,
> > like this: split().body()
> > route returns the message like it was before splitting.
> > However, if I reference aggregation strategy in a split call like
> > this: .split(body(),new AggrStrat())
> > then route returns the message after aggregation. However, if I want to
> do
> > further tasks in a route after aggregator, in which I reference the same
> > strategy as in a splitter,
> > such as for example unmarshalling the aggregated message from fixedlength
> > format and marshalling into json, although these tasks were properly
> done,
> > which I can verify by logging body, the REST route that is calling the
> > observed route still receives the aggregated fixedlength message format.
> > So, basically I see json printed in java console, but in postman or
> browser
> > I get fixedlength format.
> > Does that mean that I have to implement conversion from fixedlength to
> json
> > in aggregation strategy class, or there is still a way to do it
> > somehow in a route builder class?
> >
> > The outline of my route is like this:
> >
> > from(getDirectRouteId(id))
> > .routeId(id)
> >         .setBody(simple("original request message"))
> >
> > .setHeader("CamelJmsDestinationName",constant("queue://QM_TEST/
> inputq?targetClient=1"))
> >
> > .to(ExchangePattern.InOut,"websphere:queue:SYSTEM.DEFAULT.LOCAL.QUEUE?
> useMessageIDAsCorrelationID=true&replyTo=REPLYQ")
> >         .unmarshal(dataFormat)
> >         .split(body(),new AgrStrat()).parallelProcessing()
> >         .process(new Processor() {
> >             public void process(Exchange exchange) throws Exception {
> >             //assemble new mq requests based on the content of splitted
> > response mq message
> >            }
> >         })
> >
> > .setHeader("CamelJmsDestinationName",constant("queue://QM_TEST/
> inputq?targetClient=1"))
> >
> > .to(ExchangePattern.InOut,"websphere:queue:SYSTEM.DEFAULT.LOCAL.QUEUE?
> useMessageIDAsCorrelationID=true&replyTo=REPLYQ")
> >         .aggregate(constant(true),new AgrStrat()).completionTimeout(
> 3000)
> >         .log("${body}")
> >         .unmarshal(dataFormat1)
> >         .marshal().json(JsonLibrary.Jackson)
> >         .log("${body}");
> >
> > AgrStrat basically concatenates the newExchange with the oldExchange,
> > inserting "\n" in between, at the moment.
> > I guess I should move format conversion there? Thank you.
>
>
>
> --
> Zoran Regvart
>

Re: What route returns

Posted by Zoran Regvart <zo...@regvart.com>.
Hi Hrvoje,
there is an example[1] in the documentation that does something
similar to what I think you want to do, also in the splitter docs[2]
there is a note on what the splitter step returns.

Here is a super simple example to illustrate this:

    from("timer:tick?period=1s")
        .setBody(constant("a b c"))
        .split(body().tokenize(" "), AggregationStrategies.groupedBody())
            .transform(simple("*${body}*"))
        .end()
        .marshal().json(JsonLibrary.Jackson)
        .log("${body}");

Here the body of `"a b c"` is split on the space token and aggregated
by the splitter by grouping the body of each child processing into a
java.util.List, processing is super simple it just converts `"x"` to
`"*x*"`, next processor after splitter now gets a java.util.List of
bodies from transform processor, in this example that processor would
just marshal the java.util.List to JSON array, so the output should be
`["*a*","*b*","*c*"]`.

I'm not 100% sure I follow the logic of the route you provided, but it
seems to me that the second JMS processor, i.e. the one followed by
the `aggregate`, should be nested within the split step.

Just remember that when you use the split step with
AggregationStrategy it's the split step the one that aggregates the
results. A while back Torsten Mielke wrote a really good explanation
on the aggregator[3] that you might want to take a look at.

zoran

[1] https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/split-eip.adoc#split-aggregate-requestreply-sample
[2] https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/split-eip.adoc#what-the-splitter-returns
[3] https://tmielke.blogspot.de/2009/01/using-camel-aggregator-correctly.html

On Tue, Apr 24, 2018 at 5:20 PM, Hrvoje Djurdjevic
<hr...@gmail.com> wrote:
> Hi,
>
> I notice that if I split body without referencing aggregation strategy,
> like this: split().body()
> route returns the message like it was before splitting.
> However, if I reference aggregation strategy in a split call like
> this: .split(body(),new AggrStrat())
> then route returns the message after aggregation. However, if I want to do
> further tasks in a route after aggregator, in which I reference the same
> strategy as in a splitter,
> such as for example unmarshalling the aggregated message from fixedlength
> format and marshalling into json, although these tasks were properly done,
> which I can verify by logging body, the REST route that is calling the
> observed route still receives the aggregated fixedlength message format.
> So, basically I see json printed in java console, but in postman or browser
> I get fixedlength format.
> Does that mean that I have to implement conversion from fixedlength to json
> in aggregation strategy class, or there is still a way to do it
> somehow in a route builder class?
>
> The outline of my route is like this:
>
> from(getDirectRouteId(id))
> .routeId(id)
>         .setBody(simple("original request message"))
>
> .setHeader("CamelJmsDestinationName",constant("queue://QM_TEST/inputq?targetClient=1"))
>
> .to(ExchangePattern.InOut,"websphere:queue:SYSTEM.DEFAULT.LOCAL.QUEUE?useMessageIDAsCorrelationID=true&replyTo=REPLYQ")
>         .unmarshal(dataFormat)
>         .split(body(),new AgrStrat()).parallelProcessing()
>         .process(new Processor() {
>             public void process(Exchange exchange) throws Exception {
>             //assemble new mq requests based on the content of splitted
> response mq message
>            }
>         })
>
> .setHeader("CamelJmsDestinationName",constant("queue://QM_TEST/inputq?targetClient=1"))
>
> .to(ExchangePattern.InOut,"websphere:queue:SYSTEM.DEFAULT.LOCAL.QUEUE?useMessageIDAsCorrelationID=true&replyTo=REPLYQ")
>         .aggregate(constant(true),new AgrStrat()).completionTimeout(3000)
>         .log("${body}")
>         .unmarshal(dataFormat1)
>         .marshal().json(JsonLibrary.Jackson)
>         .log("${body}");
>
> AgrStrat basically concatenates the newExchange with the oldExchange,
> inserting "\n" in between, at the moment.
> I guess I should move format conversion there? Thank you.



-- 
Zoran Regvart