You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Aleksander Pena <al...@gmail.com> on 2012/08/12 14:31:00 UTC

Splitter + aggregator + dynamic timeout

 Hi guys,

I have specific requirements. Incoming request should be split into same
another requests and sent to different sources paralelly. The sources
(legacy and new) are almost compatible with each other but legacy responds
much slower than new. To prevent degradation of the whole service incoming
request will have timeout attribute set dynamically to the required value.
So when legacy system do not respond in time the service should ignore
response from it and return information about timeout.
I wonder which EIP components can be used for such service? 
I created following route with using splitter and aggregator because
aggregator has completionTimeout with dynamic argument:



Unfortunately results are not as I expected.Please see below excerpt from
console logs. Responses from sources are aggregated correctly but result
stay inside Exchange in thread #3 and it is not returned to the caller main
thread.



Could you advice what could be wrong with my route? I'm not so sure about
places where I put end() elements? Anyway I saw in documentation that the
only  way to return result from splitter to the main thread is to use its
aggregation strategy but maybe there is some workaround for this?
I'm using Camel 2.8.0-fuse-01-13.

Thanks.
Alek



--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitter + aggregator + dynamic timeout

Posted by Henrique Viecili <he...@myreks.com>.
Sorry to revive this post, but I am facing a similar problem.

I see you are doing a split/aggregation with *dynamic timeout* coming from
some property. What I need is to specify the timeout based on the
content/header of the message via Expression... is it possible?

att.
*Henrique Viecili*

On Thu, Aug 16, 2012 at 7:05 AM, Babak Vahdat
<ba...@swissonline.ch>wrote:

>
> Am 16.08.12 11:31 schrieb "Aleksander Pena" unter
> <al...@gmail.com>:
>
> >Hi Babak,
> >
> >your solution works fine but you removed parallel processing which is
> >important for me :)
>
> Just be aware that if you make use of parallelProcessing option then
> you've got no guarantee about the *order* of the outcomes. As an example
> try to run the following unit-test and see how the content of the file
> "target/concurrent/outbox/result.txt" seems like:
>
> https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/a
> pache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java
>
> And that even it's content could be different by each run of the test!
>
> >Anyway I found solution for my original problem:
>
> Happy to hear that.
>
> >
> >1. before processing is started I'm storing current exchange (from a main
> >thread) as a property (myEx). Then myEx property is propagated to splitter
> >threads.
> >2. after aggregation is ended (in splitter thread)  I'm retrieving 'myEx'
> >exchange from properties and set new property on it with aggregation
> >results (myRes).
> >3. after whole processing (when I'm in the main thread again) I'm
> >retrieving myRes property from the exchange and set it as a body :)
> >
> >Thanks guys for all your help,
> >Alek
> >
> >
> >
> >
> >--
> >View this message in context:
> >
> http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp57
> >17166p5717486.html
> >Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
>

Re: Splitter + aggregator + dynamic timeout

Posted by Babak Vahdat <ba...@swissonline.ch>.
Am 16.08.12 11:31 schrieb "Aleksander Pena" unter
<al...@gmail.com>:

>Hi Babak,
>
>your solution works fine but you removed parallel processing which is
>important for me :)

Just be aware that if you make use of parallelProcessing option then
you've got no guarantee about the *order* of the outcomes. As an example
try to run the following unit-test and see how the content of the file
"target/concurrent/outbox/result.txt" seems like:

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/a
pache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java

And that even it's content could be different by each run of the test!

>Anyway I found solution for my original problem:

Happy to hear that.

>
>1. before processing is started I'm storing current exchange (from a main
>thread) as a property (myEx). Then myEx property is propagated to splitter
>threads.
>2. after aggregation is ended (in splitter thread)  I'm retrieving 'myEx'
>exchange from properties and set new property on it with aggregation
>results (myRes).
>3. after whole processing (when I'm in the main thread again) I'm
>retrieving myRes property from the exchange and set it as a body :)
>
>Thanks guys for all your help,
>Alek
>
>
>
>
>--
>View this message in context:
>http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp57
>17166p5717486.html
>Sent from the Camel - Users mailing list archive at Nabble.com.



Re: Splitter + aggregator + dynamic timeout

Posted by Aleksander Pena <al...@gmail.com>.
Hi Babak,

your solution works fine but you removed parallel processing which is
important for me :)
Anyway I found solution for my original problem:

1. before processing is started I'm storing current exchange (from a main
thread) as a property (myEx). Then myEx property is propagated to splitter
threads.
2. after aggregation is ended (in splitter thread)  I'm retrieving 'myEx'
exchange from properties and set new property on it with aggregation
results (myRes).
3. after whole processing (when I'm in the main thread again) I'm
retrieving myRes property from the exchange and set it as a body :)

Thanks guys for all your help,
Alek




--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717486.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitter + aggregator + dynamic timeout

Posted by Babak Vahdat <ba...@swissonline.ch>.
Hi Alex,

Regarding your "open question" by your mail below please find a slightly
modified version of your routing which now would pass. See also the "XXX"
comments of mine. Hope this helps.

Babak

public class SplitterWithAggregatorTest extends CamelTestSupport {

    @Test
    public void shouldProcessCorrectlyOnBothSources() throws Exception {
            MockEndpoint split = getMockEndpoint("mock:split");
            split.expectedBodiesReceivedInAnyOrder("1", "2", "3");
            
            MockEndpoint result = getMockEndpoint("mock:result2");
            result.expectedBodiesReceived("1+2+3");
            
            template.requestBody("direct:start", "A,B,C");
            assertMockEndpointsSatisfied();
    }
    
@Override
protected RouteBuilder createRouteBuilder() throws Exception {

    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            
            from("direct:start")
                    .log("start body: ${body}")
                    .to("direct:process")
                    .log("result body: ${body}")
                    .to("mock:result");
            
            from("direct:process")
                .split(body())
                // .parallelProcessing() XXX: no parallelProcessing as
otherwise no gurantee for the expected order (we want the data flow "1" then
"2" and at last "3")
                    .log("Split line ${body}")
                            .bean(new Responder())
                    .to("mock:split")
                .aggregate(header("myId"), new MyAggregationStrategy())
                    .completionSize(3)
                    .log("aggregated ${body}")
                    .log("completed by
${property.CamelAggregatedCompletedBy}")
                .log("test body: ${body}")
                .to("direct:result2");
            
            from("direct:result2") // XXX: a third route where we get the
final result of the whole process
           // XXX: any possible final processing goes here
           // ...
            .to("mock:result2");
        }
    };
}

    public class Responder {

            public String translate(Exchange ex, String key) {
                    ex.getIn().setHeader("myId", "correlation id 1");
                    if ("A".equals(key)) {
                            return "1";
                    } else if ("B".equals(key)) {
                            return "2";
                    } else {
                            return "3";
                    }
            }
    }

    public class MyAggregationStrategy implements AggregationStrategy {

            public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
                    if (oldExchange == null) {
                            return newExchange;
                    }
                    String body = newExchange.getIn().getBody(String.class);
                    String existing =
oldExchange.getIn().getBody(String.class);

                    oldExchange.getIn().setBody(existing + "+" + body);
                    return oldExchange;
            }

    }
}



Aleksander Pena wrote
> 
> Henryk,
> 
> thanks for another way of resolving my requirements it works perfectly
> well, but there are missing some important req: I need to setup timeout
> dynamically, so the flow could looks like following:
> 
> .when(body().isEqualTo("foo"))
>      .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT))
> .otherwise()
>      .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT))
> .end()
> 
> Unfortunately I cannot find timeout with argument of Expression type :(
> 
> It would be the best if I could specify timeout per each flow in route
> like
> in the below excerpt:
> 
> from("direct:start")
>     .timeout(property(TIMEOUT))
>     .process(new SomeProcessingHere())
>     .to("direct:anotherEndpoint");
> 
> But probably it is difficult to implement such behaviour in Camel.
> 
> Anyway there is still open question why my original example doesn't work.
> Is it really a bug in Camel as Babak suggesting?
> 
> Thanks for help,
> Alek
> 



--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717376.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitter + aggregator + dynamic timeout

Posted by Babak Vahdat <ba...@swissonline.ch>.
Hi

I WAS WRONG with my bug assumption, see the answer here:

http://camel.465427.n5.nabble.com/Is-this-routing-behaviour-as-expected-td5717331.html

Sorry for the noise & confusion.

Babak


Aleksander Pena wrote
> 
> Henryk,
> 
> thanks for another way of resolving my requirements it works perfectly
> well, but there are missing some important req: I need to setup timeout
> dynamically, so the flow could looks like following:
> 
> .when(body().isEqualTo("foo"))
>      .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT))
> .otherwise()
>      .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT))
> .end()
> 
> Unfortunately I cannot find timeout with argument of Expression type :(
> 
> It would be the best if I could specify timeout per each flow in route
> like
> in the below excerpt:
> 
> from("direct:start")
>     .timeout(property(TIMEOUT))
>     .process(new SomeProcessingHere())
>     .to("direct:anotherEndpoint");
> 
> But probably it is difficult to implement such behaviour in Camel.
> 
> Anyway there is still open question why my original example doesn't work.
> Is it really a bug in Camel as Babak suggesting?
> 
> Thanks for help,
> Alek
> 



--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717357.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitter + aggregator + dynamic timeout

Posted by Henryk Konsek <he...@gmail.com>.
Hi Olek,

I'll take a look at both timeout in my solution and in possible bug in
your original one. Probably tomorrow, I'll got some spare time to
analyze these issues.

Laters.

-- 
Henryk Konsek
http://henryk-konsek.blogspot.com

Re: Splitter + aggregator + dynamic timeout

Posted by Aleksander Pena <al...@gmail.com>.
Henryk,

thanks for another way of resolving my requirements it works perfectly
well, but there are missing some important req: I need to setup timeout
dynamically, so the flow could looks like following:

.when(body().isEqualTo("foo"))
     .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT))
.otherwise()
     .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT))
.end()

Unfortunately I cannot find timeout with argument of Expression type :(

It would be the best if I could specify timeout per each flow in route like
in the below excerpt:

from("direct:start")
    .timeout(property(TIMEOUT))
    .process(new SomeProcessingHere())
    .to("direct:anotherEndpoint");

But probably it is difficult to implement such behaviour in Camel.

Anyway there is still open question why my original example doesn't work.
Is it really a bug in Camel as Babak suggesting?

Thanks for help,
Alek




--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717264.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitter + aggregator + dynamic timeout

Posted by Babak Vahdat <ba...@swissonline.ch>.
Changing the second route as the following makes the unit-test to pass:

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start").to("direct:process").to("mock:result");

                from("direct:process").process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        exchange.getIn().setBody("1+2+3");
                    }
                }).to("mock:result2");
            }
        };
    }

So to me this's definitely a bug!

Babak



--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717224.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitter + aggregator + dynamic timeout

Posted by Babak Vahdat <ba...@swissonline.ch>.
Hi

the sample unit-test Alek has already provided by this thread puzzles me a
bit, so I changed it to a minimum (without parallel processing) which I
expect it to pass but it doesn't as "mock:result" is not satisfied:

    java.lang.AssertionError: mock://result Body of message: 0. Expected:
<1+2+3> but was: <A,B,C>

However "mock:result2" is satisfied, which is as expected.

Can someone please advice me about the point I'm missing, or could it maybe
a bug?

public class SplitterWithAggregatorTest extends CamelTestSupport {

    @Test
    public void shouldProcessCorrectlyOnBothSources() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived("1+2+3");
        getMockEndpoint("mock:result2").expectedBodiesReceived("1+2+3");

        template.requestBody("direct:start", "A,B,C");

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start").to("direct:process").to("mock:result");

                from("direct:process").split(body()).bean(new
Responder()).aggregate(header("myId"), new
MyAggregationStrategy()).completionSize(3).to("mock:result2");
            }
        };
    }

    public static class Responder {

        public String translate(Exchange exchange, String body) {
            exchange.getIn().setHeader("myId", "myValue");

            if ("A".equals(body)) {
                return "1";
            } else if ("B".equals(body)) {
                return "2";
            } else if ("C".equals(body)) {
                return "3";
            } else {
                throw new IllegalArgumentException("bla bla");
            }
        }

    }

    public static class MyAggregationStrategy implements AggregationStrategy
{

        public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }

            String oldBody = oldExchange.getIn().getBody(String.class);
            String newBody = newExchange.getIn().getBody(String.class);
            oldExchange.getIn().setBody(oldBody + "+" + newBody);
            return oldExchange;
        }

    }
}


Thanks, Babak



--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717221.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitter + aggregator + dynamic timeout

Posted by Henryk Konsek <he...@gmail.com>.
Hi Olek,

> I'm not sure if the example I've added in the original message is visible
> by you? I've added example route to the message above and it is visible via
> web page.Please advice as this is my first post here :)

Yeah, I can see the code via Nabble web interface, but the code
formatting makes the examples invisible for GMail. It is safer to send
examples as plain text. :)

> More detailed description of my reqs are as be:
> And yes before I'll continue processing I need
> to aggregate results from (2) and (3).

Ok, let's start with some EIP magic here. :) Your problem can be
solved with the following EIP flow:

a) Split the message coming to the aggregation HTTP endpoint.
b) Start concurrent processing for each part of the split message.
c) For each part of the message send it to the content based router.
d) The router decides to which Enricher endpoint message should be sent.
e) Aggregate the results using the GroupedExchangeAggregationStrategy.
f) Send response.

And this is sample routes demonstrating possible implementation of the
flow above:

from("direct:emulateLegacyHttp").
  setBody().simple("emulateLegacyHttp-${body}");

from("direct:emulateNonLegacyHttp").
  setBody().simple("emulateNonLegacyHttp-${body}");

from("direct:serviceAggregator").
  split(body().tokenize(":")).
  aggregationStrategy(new GroupedExchangeAggregationStrategy()).
  parallelProcessing().
  choice().
    when(body().isEqualTo("foo")).
      enrich("direct:emulateLegacyHttp").
    otherwise().
      enrich("direct:emulateNonLegacyHttp").
   end().
   end().
   to("direct:aggregatedResults");

These are the core routing rules for your issue. You can enhance it
with timeout, filtering and other features you need. The results of
aggregation will be stored in the exchange property under the
following key - Exchange.GROUPED_EXCHANGE .

Remember that Camel is pretty flexible so my solution is not the only
one possible. That's how I would do it. :)

Here is also some lecture [1] from my blog regarding service aggregation. :)

[1] http://henryk-konsek.blogspot.com/2012/05/aggregating-multiple-web-services.html

-- 
Henryk Konsek
http://henryk-konsek.blogspot.com

Re: Splitter + aggregator + dynamic timeout

Posted by Aleksander Pena <al...@gmail.com>.
Hi Henryk,


> First of all - apparently you forgot to add the routes examples to the
> message. :) Could you send them here?

I'm not sure if the example I've added in the original message is visible
by you? I've added example route to the message above and it is visible via
web page.Please advice as this is my first post here :)

> The key question is what do you want to do with the responses from
> both services (legacy and non-legacy). Do you need to collect both
> results before continue processing? Do you want to merge the
> aggregated results into a single message?

More detailed description of my reqs are as be:
Incoming message is from http endpoint (1) and reply should be sent back to
it.
Legacy (2)  and new (3) systems are http endpoints as well. I need to sent
requests to them, aggregate their responses into one response and sent it
back to the starting endpoint (1). Responses from (2) and (3) should be
aggregated together and there is some additional processing needed
(removing duplicates etc). And yes before I'll continue processing I need
to aggregate results from (2) and (3).

Regards,
Alek




--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717171.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitter + aggregator + dynamic timeout

Posted by Henryk Konsek <he...@gmail.com>.
Hi Olek,

First of all - apparently you forgot to add the routes examples to the
message. :) Could you send them here?

> I wonder which EIP components can be used for such service?

Splitter or Mutlicast with parallel processing enabled. Plus some kind
of aggregation (more about this below).

The key question is what do you want to do with the responses from
both services (legacy and non-legacy). Do you need to collect both
results before continue processing? Do you want to merge the
aggregated results into a single message?

Send us your routes and tell more about how you treat the responses
from the services you call. Then we should be able to give you more
precise answer :) .

Laters.

-- 
Henryk Konsek
http://henryk-konsek.blogspot.com