You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Ravi Nallappan <ra...@gmail.com> on 2015/06/23 13:11:26 UTC

Camel 2.15.1 Aggregator: Using multiple to() endpoints

Hi,

I am trying to create modular routes that at the end stitched together by a
main route. This strategy is working for most of my other routing except
when I use aggregator.

I have created a simple test case for this scenario:

<pre>
package com.ravi.test;

import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

public class AggregateRouteTest extends CamelTestSupport {
  @Override
  protected RouteBuilder createRouteBuilder() {
    return new RouteBuilder() {
      @Override
      public void configure() {
        // @formatter:off
        from("direct://test-combine")
          .to("direct://combine")
//        .to("mock:result") /* 1 */
        ;

        from("direct://combine")
          .aggregate(header("myId"), new BeanAggregationStrategy())
            .completionSize(3)
            .completionTimeout(3000)
          .to("mock:result") /* 2 */
        ;
        // @formatter:on
      }

      class BeanAggregationStrategy implements AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
          if (oldExchange == null) {
            return newExchange;
          }
          String oldBody = oldExchange.getIn().getBody(String.class);
          String newBody = newExchange.getIn().getBody(String.class);
          String body = oldBody + newBody;
          oldExchange.getIn().setBody(body);
          return oldExchange;
        }
      }
    };
  }

  @Test
  public void simpleTest() throws Exception {
    MockEndpoint mock = getMockEndpoint("mock:result");
    mock.expectedBodiesReceived("ABC");
    template.sendBodyAndHeader("direct://test-combine", "A", "myId", 1);
    template.sendBodyAndHeader("direct://test-combine", "B", "myId", 1);
    template.sendBodyAndHeader("direct://test-combine", "C", "myId", 1); /*
3 */
    assertMockEndpointsSatisfied();
  }
}
</pre>

This code works and test case passes. However, if I uncomment line /* 1 */ and
comment out line /* 2 */ above, the test case fails.

"direct://combine" endpoint's route contains aggregation logic, while
"direct://test-combine"
and "mock:result" are suppose wrapper to test the route. However, this does
not work and requires "mock:result" to be part of "direct://combine".

Even by commenting out /* 3 */ above (i.e incomplete messages to
aggregate), we can see above code will wait awhile for a last message while
the other modification does not even wait before failing (aggregator not
kicked in at all?).

Appreciate if anyone able to explain the reason. And if its still possible
to keep "direct://combine" modular i.e combine endpoint's route remain free
from before and after routes details.

Thanks and regards,
Ravi Nallappan

Re: Camel 2.15.1 Aggregator: Using multiple to() endpoints

Posted by Ravi Nallappan <ra...@gmail.com>.
Sorry, this maybe better - using setMsgId()
<pre>
@Test
public void request01Test() throws Exception {
    final PDUBean bean1 = new PDUBean();
    bean1.setMsgId("1");

    final PDUBean bean2 = new PDUBean();
    bean2.setMsgId("2");

    PDUBean rslt1 = template.requestBody(bean1, PDUBean.class);
    PDUBean rslt2 = template.requestBody(bean2, PDUBean.class);
    assertEquals("Found 1 0", rslt1.getErrMsg());
    assertEquals("Found 2 1", rslt2.getErrMsg());
}
</pre>

On Wed, Jun 24, 2015 at 8:27 PM, Ravi Nallappan <ra...@gmail.com>
wrote:

> Thanks Claus.
>
> Ok, I have made some changes on my design (daisy-chaining way instead).
>
> From
> ====
>
> Maven Camel Project A: from(direct://<ModuleA>).<processing>
> Maven Camel Project B: from(direct://<ModuleB>).<processing>
> Maven Camel Project C: from(direct://<ModuleC>).<processing>
>
> Maven Camel Project
> Main: from(<consumer>).to(direct://<ModuleA>).to(direct://<ModuleB>).to(direct://<ModuleC>)
>
> To
> ==
>
> Maven Camel Project
> A: from(direct://<ModuleA>-in).<processing>.to(direct://<ModuleA>-out)
> Maven Camel Project
> B: from(direct://<ModuleB>-in).<processing>.to(direct://<ModuleB>-out)
> Maven Camel Project
> C: from(direct://<ModuleC>-in).<processing>.to(direct://<ModuleC>-out)
>
> Maven Camel Project Main:
> from(<consumer>).to(direct://<ModuleA>-in)
> from(direct://<ModuleA>-out).to(direct://<ModuleB>-in)
> from(direct://<ModuleB>-out).to(direct://<ModuleC>-in)
> from(direct://<ModuleC>-out).log(<non essential>)
>
> Its seems to be working fine now. (Let me know if anything there's any red
> flag).
>
> Another related question on this - Now my consumer expects response to
> send back to external system (request-reply). From my current observation,
> every message that's sent out also gets correct reply despite being
> aggregated (combined by aggregate) - i.e somehow it knows which exact
> payload need to be returned for each request (using bean as payload).
>
> Is this expected behaviour? Does the 'discarded' Exchanges
> (inside AggregationStrategy.aggregate) wait for 'retained' Exchange to go
> all the way forward in route and back before returned? And does it do
> anything to figure out how to 'split' the message again (I am taking blind
> guess here).
>
> Hope following camel test snippet clarifies what I was referring to.
> <pre>
> //...
> @Override
> protected RouteBuilder createRouteBuilder() {
> return new CombineRouteBuilder() {
>     @Override
>     public void configure() {
>         super.configure();
>         from("direct://test-combine").to("direct://combine-in");
>         from("direct://combine-out").process(new Processor(){
>             @Override
>             public void process(Exchange exchange) throws Exception {
>                 int i = 0 ;
>                 for (PDUBean b : exchange.getIn().getBody(PDUBean.class) )
> {
>                     b.setErrMsg("Found " + b.getMsgId() + " " + i++ );
>                 }
>             }
>         }).to("mock:result") ;
>     }
> };
> }
> //...
> @Test
> public void request01Test() throws Exception {
>     final PDUBean bean1 = new PDUBean();
>     bean1.setMessage("Hello, World! 1");
>
>     final PDUBean bean2 = new PDUBean();
>     bean2.setMessage("Hello, World! 2");
>
>     PDUBean rslt1 = template.requestBody(bean1, PDUBean.class);
>     PDUBean rslt2 = template.requestBody(bean2, PDUBean.class);
>     assertEquals("Found 1 0", rslt1.getErrMsg());
>     assertEquals("Found 2 1", rslt2.getErrMsg());
> }
> </pre>
>
>
> Thanks and regards,
> Ravi Nallappan
>
>
>
> On Tue, Jun 23, 2015 at 8:46 PM, Claus Ibsen <cl...@gmail.com>
> wrote:
>
>> Hi
>>
>> A bit hard to explain without having to write endless long emails.
>>
>> This is expected the aggregator uses a separate "leg" of the routing
>> when it sends completed outgoing messages. eg its input and output
>> legs are separated.
>>
>>
>> On Tue, Jun 23, 2015 at 1:11 PM, Ravi Nallappan
>> <ra...@gmail.com> wrote:
>> > Hi,
>> >
>> > I am trying to create modular routes that at the end stitched together
>> by a
>> > main route. This strategy is working for most of my other routing except
>> > when I use aggregator.
>> >
>> > I have created a simple test case for this scenario:
>> >
>> > <pre>
>> > package com.ravi.test;
>> >
>> > import org.apache.camel.Exchange;
>> > import org.apache.camel.builder.RouteBuilder;
>> > import org.apache.camel.component.mock.MockEndpoint;
>> > import org.apache.camel.processor.aggregate.AggregationStrategy;
>> > import org.apache.camel.test.junit4.CamelTestSupport;
>> > import org.junit.Test;
>> >
>> > public class AggregateRouteTest extends CamelTestSupport {
>> >   @Override
>> >   protected RouteBuilder createRouteBuilder() {
>> >     return new RouteBuilder() {
>> >       @Override
>> >       public void configure() {
>> >         // @formatter:off
>> >         from("direct://test-combine")
>> >           .to("direct://combine")
>> > //        .to("mock:result") /* 1 */
>> >         ;
>> >
>> >         from("direct://combine")
>> >           .aggregate(header("myId"), new BeanAggregationStrategy())
>> >             .completionSize(3)
>> >             .completionTimeout(3000)
>> >           .to("mock:result") /* 2 */
>> >         ;
>> >         // @formatter:on
>> >       }
>> >
>> >       class BeanAggregationStrategy implements AggregationStrategy {
>> >         @Override
>> >         public Exchange aggregate(Exchange oldExchange, Exchange
>> > newExchange) {
>> >           if (oldExchange == null) {
>> >             return newExchange;
>> >           }
>> >           String oldBody = oldExchange.getIn().getBody(String.class);
>> >           String newBody = newExchange.getIn().getBody(String.class);
>> >           String body = oldBody + newBody;
>> >           oldExchange.getIn().setBody(body);
>> >           return oldExchange;
>> >         }
>> >       }
>> >     };
>> >   }
>> >
>> >   @Test
>> >   public void simpleTest() throws Exception {
>> >     MockEndpoint mock = getMockEndpoint("mock:result");
>> >     mock.expectedBodiesReceived("ABC");
>> >     template.sendBodyAndHeader("direct://test-combine", "A", "myId", 1);
>> >     template.sendBodyAndHeader("direct://test-combine", "B", "myId", 1);
>> >     template.sendBodyAndHeader("direct://test-combine", "C", "myId",
>> 1); /*
>> > 3 */
>> >     assertMockEndpointsSatisfied();
>> >   }
>> > }
>> > </pre>
>> >
>> > This code works and test case passes. However, if I uncomment line /* 1
>> */ and
>> > comment out line /* 2 */ above, the test case fails.
>> >
>> > "direct://combine" endpoint's route contains aggregation logic, while
>> > "direct://test-combine"
>> > and "mock:result" are suppose wrapper to test the route. However, this
>> does
>> > not work and requires "mock:result" to be part of "direct://combine".
>> >
>> > Even by commenting out /* 3 */ above (i.e incomplete messages to
>> > aggregate), we can see above code will wait awhile for a last message
>> while
>> > the other modification does not even wait before failing (aggregator not
>> > kicked in at all?).
>> >
>> > Appreciate if anyone able to explain the reason. And if its still
>> possible
>> > to keep "direct://combine" modular i.e combine endpoint's route remain
>> free
>> > from before and after routes details.
>> >
>> > Thanks and regards,
>> > Ravi Nallappan
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cibsen@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen
>> hawtio: http://hawt.io/
>> fabric8: http://fabric8.io/
>>
>
>

Re: Camel 2.15.1 Aggregator: Using multiple to() endpoints

Posted by Ravi Nallappan <ra...@gmail.com>.
Thanks Claus.

Ok, I have made some changes on my design (daisy-chaining way instead).

From
====

Maven Camel Project A: from(direct://<ModuleA>).<processing>
Maven Camel Project B: from(direct://<ModuleB>).<processing>
Maven Camel Project C: from(direct://<ModuleC>).<processing>

Maven Camel Project
Main: from(<consumer>).to(direct://<ModuleA>).to(direct://<ModuleB>).to(direct://<ModuleC>)

To
==

Maven Camel Project
A: from(direct://<ModuleA>-in).<processing>.to(direct://<ModuleA>-out)
Maven Camel Project
B: from(direct://<ModuleB>-in).<processing>.to(direct://<ModuleB>-out)
Maven Camel Project
C: from(direct://<ModuleC>-in).<processing>.to(direct://<ModuleC>-out)

Maven Camel Project Main:
from(<consumer>).to(direct://<ModuleA>-in)
from(direct://<ModuleA>-out).to(direct://<ModuleB>-in)
from(direct://<ModuleB>-out).to(direct://<ModuleC>-in)
from(direct://<ModuleC>-out).log(<non essential>)

Its seems to be working fine now. (Let me know if anything there's any red
flag).

Another related question on this - Now my consumer expects response to send
back to external system (request-reply). From my current observation, every
message that's sent out also gets correct reply despite being aggregated
(combined by aggregate) - i.e somehow it knows which exact payload need to
be returned for each request (using bean as payload).

Is this expected behaviour? Does the 'discarded' Exchanges
(inside AggregationStrategy.aggregate) wait for 'retained' Exchange to go
all the way forward in route and back before returned? And does it do
anything to figure out how to 'split' the message again (I am taking blind
guess here).

Hope following camel test snippet clarifies what I was referring to.
<pre>
//...
@Override
protected RouteBuilder createRouteBuilder() {
return new CombineRouteBuilder() {
    @Override
    public void configure() {
        super.configure();
        from("direct://test-combine").to("direct://combine-in");
        from("direct://combine-out").process(new Processor(){
            @Override
            public void process(Exchange exchange) throws Exception {
                int i = 0 ;
                for (PDUBean b : exchange.getIn().getBody(PDUBean.class) ) {
                    b.setErrMsg("Found " + b.getMsgId() + " " + i++ );
                }
            }
        }).to("mock:result") ;
    }
};
}
//...
@Test
public void request01Test() throws Exception {
    final PDUBean bean1 = new PDUBean();
    bean1.setMessage("Hello, World! 1");

    final PDUBean bean2 = new PDUBean();
    bean2.setMessage("Hello, World! 2");

    PDUBean rslt1 = template.requestBody(bean1, PDUBean.class);
    PDUBean rslt2 = template.requestBody(bean2, PDUBean.class);
    assertEquals("Found 1 0", rslt1.getErrMsg());
    assertEquals("Found 2 1", rslt2.getErrMsg());
}
</pre>


Thanks and regards,
Ravi Nallappan



On Tue, Jun 23, 2015 at 8:46 PM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> A bit hard to explain without having to write endless long emails.
>
> This is expected the aggregator uses a separate "leg" of the routing
> when it sends completed outgoing messages. eg its input and output
> legs are separated.
>
>
> On Tue, Jun 23, 2015 at 1:11 PM, Ravi Nallappan
> <ra...@gmail.com> wrote:
> > Hi,
> >
> > I am trying to create modular routes that at the end stitched together
> by a
> > main route. This strategy is working for most of my other routing except
> > when I use aggregator.
> >
> > I have created a simple test case for this scenario:
> >
> > <pre>
> > package com.ravi.test;
> >
> > import org.apache.camel.Exchange;
> > import org.apache.camel.builder.RouteBuilder;
> > import org.apache.camel.component.mock.MockEndpoint;
> > import org.apache.camel.processor.aggregate.AggregationStrategy;
> > import org.apache.camel.test.junit4.CamelTestSupport;
> > import org.junit.Test;
> >
> > public class AggregateRouteTest extends CamelTestSupport {
> >   @Override
> >   protected RouteBuilder createRouteBuilder() {
> >     return new RouteBuilder() {
> >       @Override
> >       public void configure() {
> >         // @formatter:off
> >         from("direct://test-combine")
> >           .to("direct://combine")
> > //        .to("mock:result") /* 1 */
> >         ;
> >
> >         from("direct://combine")
> >           .aggregate(header("myId"), new BeanAggregationStrategy())
> >             .completionSize(3)
> >             .completionTimeout(3000)
> >           .to("mock:result") /* 2 */
> >         ;
> >         // @formatter:on
> >       }
> >
> >       class BeanAggregationStrategy implements AggregationStrategy {
> >         @Override
> >         public Exchange aggregate(Exchange oldExchange, Exchange
> > newExchange) {
> >           if (oldExchange == null) {
> >             return newExchange;
> >           }
> >           String oldBody = oldExchange.getIn().getBody(String.class);
> >           String newBody = newExchange.getIn().getBody(String.class);
> >           String body = oldBody + newBody;
> >           oldExchange.getIn().setBody(body);
> >           return oldExchange;
> >         }
> >       }
> >     };
> >   }
> >
> >   @Test
> >   public void simpleTest() throws Exception {
> >     MockEndpoint mock = getMockEndpoint("mock:result");
> >     mock.expectedBodiesReceived("ABC");
> >     template.sendBodyAndHeader("direct://test-combine", "A", "myId", 1);
> >     template.sendBodyAndHeader("direct://test-combine", "B", "myId", 1);
> >     template.sendBodyAndHeader("direct://test-combine", "C", "myId", 1);
> /*
> > 3 */
> >     assertMockEndpointsSatisfied();
> >   }
> > }
> > </pre>
> >
> > This code works and test case passes. However, if I uncomment line /* 1
> */ and
> > comment out line /* 2 */ above, the test case fails.
> >
> > "direct://combine" endpoint's route contains aggregation logic, while
> > "direct://test-combine"
> > and "mock:result" are suppose wrapper to test the route. However, this
> does
> > not work and requires "mock:result" to be part of "direct://combine".
> >
> > Even by commenting out /* 3 */ above (i.e incomplete messages to
> > aggregate), we can see above code will wait awhile for a last message
> while
> > the other modification does not even wait before failing (aggregator not
> > kicked in at all?).
> >
> > Appreciate if anyone able to explain the reason. And if its still
> possible
> > to keep "direct://combine" modular i.e combine endpoint's route remain
> free
> > from before and after routes details.
> >
> > Thanks and regards,
> > Ravi Nallappan
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/
>

Re: Camel 2.15.1 Aggregator: Using multiple to() endpoints

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

A bit hard to explain without having to write endless long emails.

This is expected the aggregator uses a separate "leg" of the routing
when it sends completed outgoing messages. eg its input and output
legs are separated.


On Tue, Jun 23, 2015 at 1:11 PM, Ravi Nallappan
<ra...@gmail.com> wrote:
> Hi,
>
> I am trying to create modular routes that at the end stitched together by a
> main route. This strategy is working for most of my other routing except
> when I use aggregator.
>
> I have created a simple test case for this scenario:
>
> <pre>
> package com.ravi.test;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
>
> public class AggregateRouteTest extends CamelTestSupport {
>   @Override
>   protected RouteBuilder createRouteBuilder() {
>     return new RouteBuilder() {
>       @Override
>       public void configure() {
>         // @formatter:off
>         from("direct://test-combine")
>           .to("direct://combine")
> //        .to("mock:result") /* 1 */
>         ;
>
>         from("direct://combine")
>           .aggregate(header("myId"), new BeanAggregationStrategy())
>             .completionSize(3)
>             .completionTimeout(3000)
>           .to("mock:result") /* 2 */
>         ;
>         // @formatter:on
>       }
>
>       class BeanAggregationStrategy implements AggregationStrategy {
>         @Override
>         public Exchange aggregate(Exchange oldExchange, Exchange
> newExchange) {
>           if (oldExchange == null) {
>             return newExchange;
>           }
>           String oldBody = oldExchange.getIn().getBody(String.class);
>           String newBody = newExchange.getIn().getBody(String.class);
>           String body = oldBody + newBody;
>           oldExchange.getIn().setBody(body);
>           return oldExchange;
>         }
>       }
>     };
>   }
>
>   @Test
>   public void simpleTest() throws Exception {
>     MockEndpoint mock = getMockEndpoint("mock:result");
>     mock.expectedBodiesReceived("ABC");
>     template.sendBodyAndHeader("direct://test-combine", "A", "myId", 1);
>     template.sendBodyAndHeader("direct://test-combine", "B", "myId", 1);
>     template.sendBodyAndHeader("direct://test-combine", "C", "myId", 1); /*
> 3 */
>     assertMockEndpointsSatisfied();
>   }
> }
> </pre>
>
> This code works and test case passes. However, if I uncomment line /* 1 */ and
> comment out line /* 2 */ above, the test case fails.
>
> "direct://combine" endpoint's route contains aggregation logic, while
> "direct://test-combine"
> and "mock:result" are suppose wrapper to test the route. However, this does
> not work and requires "mock:result" to be part of "direct://combine".
>
> Even by commenting out /* 3 */ above (i.e incomplete messages to
> aggregate), we can see above code will wait awhile for a last message while
> the other modification does not even wait before failing (aggregator not
> kicked in at all?).
>
> Appreciate if anyone able to explain the reason. And if its still possible
> to keep "direct://combine" modular i.e combine endpoint's route remain free
> from before and after routes details.
>
> Thanks and regards,
> Ravi Nallappan



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/