You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by meng <ti...@gmail.com> on 2016/10/27 22:19:18 UTC

Problem: Concatenate the different input(from different route) using aggreagationStrategy

Hi, I now design three route to process my input message and want to
concatenate them using my aggregation strategy.Here is my design:
payload ->route1:"direct:start", do multicast to route 2("direct:a") and
route 3("direct:b")
in route 2("direct:a"), do multicast to 8 servers, and use
JoinReplyAggregationStrategy to concatenate the result, lets call the result
as route2result and sent this result to route 3("direct:b")
in route 3("direct:b"), now have two inputs, the original payload from route
1 and the result from route2.
I also want to use the same JoinReplyAggregationStrategy to concatenate
these two parts, but it give me an exception and the output is only the
original payload from route1("direct:start").
Here is the exception:
Caused by: java.lang.NullPointerException: null
	at
mengt.camel.JoinReplyAggregationStrategy.aggregate(JoinReplyAggregationStrategy.java:19)
~[main/:na]
	at
org.apache.camel.processor.aggregate.AggregateProcessor.onAggregation(AggregateProcessor.java:629)
~[camel-core-2.16.1.jar:2.16.1]
	at
org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:447)
~[camel-core-2.16.1.jar:2.16.1]
	... 29 common frames omitted

Here is the code of how to implement my route and aggregation Strategy:
public void configure() throws Exception{
        from("direct:start")
                .marshal()
                .string(UTF_8)
                .multicast()
                .parallelProcessing()
                .to("direct:route1")
                .to("direct:route2")
        ;

        from("direct:route1")
                .multicast()
                .parallelProcessing()
                .aggregationStrategy(new JoinReplyAggregationStrategy())
                .to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
TARGET7, TARGET8)
                //.to("log:?level=INFO&showBody=true")
                .timeout(100000)
                .end()
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        String input =
exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ",");
                        exchange.getOut().setBody(input);
                        System.out.println(input);
                    }
                })
                .to("stream:out")
               // .to("direct:b")
                .to("direct:route2");

       from("direct:route2")
              .aggregate(constant(true), new JoinReplyAggregationStrategy())
               .completionTimeout(500L)
              .to("stream:out")

public class JoinReplyAggregationStrategy implements AggregationStrategy{

    public Exchange aggregate(Exchange exchangeOld, Exchange exchangeNew){
        //String body1 = null;
        //static Logger logger = Logger.getLogger(exchangeOld.getIn());
        if(exchangeOld == null){
            return exchangeNew;
        }else {
            String body1 = exchangeOld.getIn().getBody(String.class);
            String body2 = exchangeNew.getIn().getBody(String.class);
            String status = "Http Status " +
exchangeOld.getIn().getHeader("CamelHttpResponseCode").toString();
            System.out.println(status);
            String merged = (body1 == null) ? body2 : body1 + "+" +body2;
            exchangeOld.getIn().setBody(merged);
            //System.out.println(merged);
            return exchangeOld;
        }
    }
}

Can I use the aggregation Strategy to concatenate the input from other two
routes directly like this? Or my understanding of aggregation is wrong?
Thanks,

Meng



--
View this message in context: http://camel.465427.n5.nabble.com/Problem-Concatenate-the-different-input-from-different-route-using-aggreagationStrategy-tp5789368.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by meng <ti...@gmail.com>.
Hi Steve, 

Thanks a lot, very helpful! I now can see the result in my
JoinReplyAggregationStrategy that two message have merged when I debugged
it. But after setBody to exchangeOld, it returns nothing. It seems get
nothing after aggregate...

Here is the strategy:
public class JoinReplyAggregationStrategy implements AggregationStrategy{

    public Exchange aggregate(Exchange exchangeOld, Exchange exchangeNew){
        if(exchangeOld == null){
            return exchangeNew;
        }else {
            String body1 = exchangeOld.getIn().getBody(String.class);
            String body2 = exchangeNew.getIn().getBody(String.class);
            String status = "Http Status " +
exchangeOld.getIn().getHeader("CamelHttpResponseCode").toString();
            System.out.println(status);
            String merged = (body2 == null) ? body1 : body1 + "+" +body2;
            exchangeOld.getIn().setBody(merged);
            //System.out.println(merged);
            return exchangeOld;
        }
    }
}

There is no error. ExchangeOld has setBody as merged string in this step,
but in my route2, exchange getIn() is nothing. Seems exchangeOld doesn't
return....
I'm so confused ....

Thanks,
Meng 



--
View this message in context: http://camel.465427.n5.nabble.com/Problem-Concatenate-the-different-input-from-different-route-using-aggreagationStrategy-tp5789368p5789487.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by Steve973 <st...@gmail.com>.
I think you are right to use completion size.

On Oct 31, 2016 5:18 AM, "meng" <ti...@gmail.com> wrote:

> Hi Steve,
>
> I changed .completionPredicate(header("aggregated").isEqualTo(2)) to
> .completionSize(2) and now I can get the return result.
> But I'm still confused when to use completionPredicate and when to use
> completionSize ?
> Thanks,
>
> Meng
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.
> com/Problem-Concatenate-the-different-input-from-different-route-using-
> aggreagationStrategy-tp5789368p5789489.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by meng <ti...@gmail.com>.
Hi Steve,

I changed .completionPredicate(header("aggregated").isEqualTo(2)) to
.completionSize(2) and now I can get the return result.
But I'm still confused when to use completionPredicate and when to use
completionSize ?
Thanks,

Meng



--
View this message in context: http://camel.465427.n5.nabble.com/Problem-Concatenate-the-different-input-from-different-route-using-aggreagationStrategy-tp5789368p5789489.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by Steve973 <st...@gmail.com>.
Try this:

In your first processor, you are setting the "out" body, which will strip
off any other exchange values like headers.  Change this to
getIn().setBody(input);

Next, change your first route to the following:

        from("direct:start")
                .setHeader("myCorrelationId", simple("${random(1000)}",
Integer.class))
                .multicast()
                .to("direct:route1", "direct:route2")
                .end()

Then, where you want to aggregate, change the route to:

           from("direct:route2")
                .aggregate(header("myCorrelationId", new
JoinReplyAggregationStrategy()).
completionPredicate(header("aggregated").isEqualTo(2)))
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        System.out.println("################");
System.out.println(exchange.getIn().getBody(String.class));
                    }
                });

I didn't compile this, so make sure that there are the correct number of
parentheses, etc.

On Sat, Oct 29, 2016 at 7:59 AM, Steve973 <st...@gmail.com> wrote:

> Yes. You need to set a header on message 1 and message 2 and use it as the
> aggregation id.  The value of this header should be the same value for both
> messages of the pair, but different pairs should have unique ids.
>
> On Oct 29, 2016 4:07 AM, "meng" <ti...@gmail.com> wrote:
>
>> Hi Steve,
>>
>> Thanks for replying.
>> I made some changes, here is the code:
>>
>>         from("direct:start")
>>                 .multicast()
>>                 .to("direct:route1", "direct:route2")
>>                 .end()
>>                 ;
>>
>>         from("direct:route1")
>>                 .multicast()
>>                 .parallelProcessing()
>>                 .aggregationStrategy(new JoinReplyAggregationStrategy())
>>                 .to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
>> TARGET7, TARGET8)
>>
>>                 .end()
>>                 .process(new Processor() {
>>                     @Override
>>                     public void process(Exchange exchange) throws
>> Exception
>> {
>>                         String input =
>> exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ",");
>>                         exchange.getOut().setBody(input);
>>                         System.out.println("*****************"+input);
>>                     }
>>                 })
>>                .to("direct:route2");
>>
>>            from("direct:route2")
>>                 .process(new Processor() {
>>                     @Override
>>                     public void process(Exchange exchange) throws
>> Exception
>> {
>>                         System.out.println("################");
>>
>> System.out.println(exchange.getIn().getBody(String.class));
>>                     }
>>                 });
>>
>> Let's say 'direct:start' broadcasts message1 to 'direct:route1' and
>> 'direct:route2'.
>> 'direct:route2' does some processing and get a result message2 then sent
>> it
>> to 'direct:route2'.
>> So 'direct:route2' now receive two massages. I can now print both message1
>> and message2. But I want to concatenate these two messages in
>> 'direct:route2' as one. Is there any way?
>>
>> Thanks,
>> Meng
>>
>>
>>
>> --
>> View this message in context: http://camel.465427.n5.nabble.
>> com/Problem-Concatenate-the-different-input-from-different-
>> route-using-aggreagationStrategy-tp5789368p5789426.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by Steve973 <st...@gmail.com>.
Yes. You need to set a header on message 1 and message 2 and use it as the
aggregation id.  The value of this header should be the same value for both
messages of the pair, but different pairs should have unique ids.

On Oct 29, 2016 4:07 AM, "meng" <ti...@gmail.com> wrote:

> Hi Steve,
>
> Thanks for replying.
> I made some changes, here is the code:
>
>         from("direct:start")
>                 .multicast()
>                 .to("direct:route1", "direct:route2")
>                 .end()
>                 ;
>
>         from("direct:route1")
>                 .multicast()
>                 .parallelProcessing()
>                 .aggregationStrategy(new JoinReplyAggregationStrategy())
>                 .to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
> TARGET7, TARGET8)
>
>                 .end()
>                 .process(new Processor() {
>                     @Override
>                     public void process(Exchange exchange) throws Exception
> {
>                         String input =
> exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ",");
>                         exchange.getOut().setBody(input);
>                         System.out.println("*****************"+input);
>                     }
>                 })
>                .to("direct:route2");
>
>            from("direct:route2")
>                 .process(new Processor() {
>                     @Override
>                     public void process(Exchange exchange) throws Exception
> {
>                         System.out.println("################");
>
> System.out.println(exchange.getIn().getBody(String.class));
>                     }
>                 });
>
> Let's say 'direct:start' broadcasts message1 to 'direct:route1' and
> 'direct:route2'.
> 'direct:route2' does some processing and get a result message2 then sent it
> to 'direct:route2'.
> So 'direct:route2' now receive two massages. I can now print both message1
> and message2. But I want to concatenate these two messages in
> 'direct:route2' as one. Is there any way?
>
> Thanks,
> Meng
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.
> com/Problem-Concatenate-the-different-input-from-different-route-using-
> aggreagationStrategy-tp5789368p5789426.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by meng <ti...@gmail.com>.
Hi Steve, 

Thanks for replying. 
I made some changes, here is the code: 

        from("direct:start") 
                .multicast() 
                .to("direct:route1", "direct:route2") 
                .end() 
                ; 

        from("direct:route1") 
                .multicast() 
                .parallelProcessing() 
                .aggregationStrategy(new JoinReplyAggregationStrategy()) 
                .to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
TARGET7, TARGET8) 

                .end() 
                .process(new Processor() { 
                    @Override 
                    public void process(Exchange exchange) throws Exception
{ 
                        String input =
exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ","); 
                        exchange.getOut().setBody(input); 
                        System.out.println("*****************"+input); 
                    } 
                }) 
               .to("direct:route2"); 

           from("direct:route2") 
                .process(new Processor() { 
                    @Override 
                    public void process(Exchange exchange) throws Exception
{ 
                        System.out.println("################"); 
                       
System.out.println(exchange.getIn().getBody(String.class)); 
                    } 
                }); 

Let's say 'direct:start' broadcasts message1 to 'direct:route1' and
'direct:route2'. 
'direct:route2' does some processing and get a result message2 then sent it
to 'direct:route2'. 
So 'direct:route2' now receive two massages. I can now print both message1
and message2. But I want to concatenate these two messages in
'direct:route2' as one. Is there any way? 

Thanks, 
Meng 



--
View this message in context: http://camel.465427.n5.nabble.com/Problem-Concatenate-the-different-input-from-different-route-using-aggreagationStrategy-tp5789368p5789426.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by Steve973 <st...@gmail.com>.
Sorry...  I see what you are doing.  Which is line 19?

On Fri, Oct 28, 2016 at 3:32 PM, Steve973 <st...@gmail.com> wrote:

> There is no way that you can aggregate like that.  Both messages need to
> be sent to the same endpoint, and they need to have some sort of
> aggregation id in order for the aggregation to know which messages to
> combine.  Let me know if this is unclear or if you need more information.
>
> On Thu, Oct 27, 2016 at 7:06 PM, meng <ti...@gmail.com> wrote:
>
>> I change to  .aggregate(simple("${body}"), new
>> JoinReplyAggregationStrategy()), now no exceptions, but only returns the
>> second exchange from route 2. No original payload comes in to
>> aggregate....
>>
>>
>>
>> --
>> View this message in context: http://camel.465427.n5.nabble.
>> com/Problem-Concatenate-the-different-input-from-different-
>> route-using-aggreagationStrategy-tp5789368p5789370.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>
>

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by Steve973 <st...@gmail.com>.
There is no way that you can aggregate like that.  Both messages need to be
sent to the same endpoint, and they need to have some sort of aggregation
id in order for the aggregation to know which messages to combine.  Let me
know if this is unclear or if you need more information.

On Thu, Oct 27, 2016 at 7:06 PM, meng <ti...@gmail.com> wrote:

> I change to  .aggregate(simple("${body}"), new
> JoinReplyAggregationStrategy()), now no exceptions, but only returns the
> second exchange from route 2. No original payload comes in to aggregate....
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.
> com/Problem-Concatenate-the-different-input-from-different-route-using-
> aggreagationStrategy-tp5789368p5789370.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by meng <ti...@gmail.com>.
I change to  .aggregate(simple("${body}"), new
JoinReplyAggregationStrategy()), now no exceptions, but only returns the
second exchange from route 2. No original payload comes in to aggregate....



--
View this message in context: http://camel.465427.n5.nabble.com/Problem-Concatenate-the-different-input-from-different-route-using-aggreagationStrategy-tp5789368p5789370.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Problem: Concatenate the different input(from different route) using aggreagationStrategy

Posted by meng <ti...@gmail.com>.
Hi Steve, 

Thanks for replying.
I made some changes, here is the code:

        from("direct:start")
                .multicast()
                .to("direct:route1", "direct:route2")
                .end()
                ;

        from("direct:route1")
                .multicast()
                .parallelProcessing()
                .aggregationStrategy(new JoinReplyAggregationStrategy())
                .to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
TARGET7, TARGET8)

                .end()
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        String input =
exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ",");
                        exchange.getOut().setBody(input);
                        System.out.println("*****************"+input);
                    }
                })
               .to("direct:route2");

           from("direct:route2")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        System.out.println("################");
                       
System.out.println(exchange.getIn().getBody(String.class));
                    }
                });

Let's say 'direct:start' broadcasts message1 to 'direct:route1' and
'direct:route2'. 
'direct:route2' does some processing and get a result message2 then sent it
to 'direct:route2'. 
So 'direct:route2' now receive two massages. I can now print both message1
and message2. But I want to concatenate these two messages in
'direct:route2' as one. Is there any way?

Thanks,
Meng




--
View this message in context: http://camel.465427.n5.nabble.com/Problem-Concatenate-the-different-input-from-different-route-using-aggreagationStrategy-tp5789368p5789425.html
Sent from the Camel - Users mailing list archive at Nabble.com.