You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Andrew Chandler <an...@riftware.com> on 2010/03/01 17:40:35 UTC

Best Strategy - aggregation

Hi there - with Clause help I've been able to get most of the way to
where I need to be.   Right now I'm doing a proof of concept with string
payloads,however in the end the payload will be an object.   Here's what
I'm attempting


I have an incoming message that contains an identifier as well as (N)
things to do against it.   The (N) things can be done in parallel.    So
what we are doing is splitting based on the (N) things.     Here's where
it gets tricky.   
- The first of the (N) things to report success should be sent on while
the rest of them should be aborted.   We should then forward the success
on immediately not waiting for timeouts
- Further, in the event that none of them report success we should
aggregate until all (N) things have reported failure and then forward
that single negative result onward.     
- As the (N) things inherently have timeouts built into them it would be
nice if I didn't have to deal with batchTimeout for the aggregator.



What I'm seeing now with my prototype is that I can successfully spit
and process the split things using a threadPoolExecutor.   I provided
to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy()) 


Assume each of the split items have a built-in timeout on their work
effort of 5 seconds
With that result and without a .batchTimeout(7000L)   I was seeing 2
results from aggregate,   - 1 almost immediately for the successful
result and then a second aggregated message that had all the falures
about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
the .aggregate clause though I got 1 single message that had the success
and the failures all in one.      This is close, however I guess what
I'm asking is how can I control from inside the aggregation the decision
to move forward?    In the splitter I'm already planning on including in
each split object a sharedobject that can be used to abort any of the
sibling split objects so I trhink I have a handle on that.
Basically the reason I need the aggregate mechanism to control the
continuing on part of the process is that if we're going after say
60,000 things then the ability to start work on the successful ones
after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
is significant.    But I still have to account for a totally negative
response in the event none of them are successful.     

I'm presently looking at creating my own AggregationCollection as it
seemed to allow me to figure out size of the aggregated collection and I
can somehow figure out the total number of items split versus how many
have been aggregated to determine I'm done.   (I thought that info was
supposed to be in the header somewhere but it doesn't seem to be there)


Any insights or redirects are appreciated.

Re: Best Strategy - aggregation

Posted by Andrew Chandler <an...@riftware.com>.
I just realized I totally misinterpreted your question - the maps you
asked about are the ones we appear to be leaking which were intended to
be references only however I think what's happening is that they aren't
going away when we think they are.   I think what is happening is that
some of the routes spawn new routes etc etc and it isn't unwinding due
to logic holes on our part.    We're in the process of trying to prove
that now

On Tue, 2010-03-02 at 16:38 +0100, Claus Ibsen wrote:

> On Tue, Mar 2, 2010 at 4:09 PM, Andrew Chandler <an...@riftware.com> wrote:
> > We wound up resolving this by basically totally subverting a secondary
> > splitter - we split the first time to do the parallel work and then
> > route to a second custom splitter that maintains an internal map of
> > result objects - the map is based on the correlation key and the object
> > includes properties for totalsplit count for that correlation key,
> > results received so far, and isForwarded.     Basically the first time a
> > correlation comes in a result is created into the map for the
> > correlationid.   If any of the results indicate success we set the
> > isForwarded to true and return a message in our list of results
> > indicating the success, however we don't remove the correlation id from
> > the map until ALL responses are received.   It's not as pretty as an
> > aggregator but it does seem to work for us.    We've had to do that
> > elsewhere as well though and considering we seem to be using memory at a
> > staggering rate my worry is that we're passing around too many maps in
> > messages that are duplicated when split etc.    Time to pull out the
> > profiler I guess :)
> >
> 
> Thanks for sharing your solution.
> 
> Where do you store those Maps? I would assume the copy is cheap as its
> just a reference copy, and not a deep copy.
> 
> 
> >
> >
> > On Tue, 2010-03-02 at 07:03 +0100, Claus Ibsen wrote:
> >
> >> On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <an...@riftware.com> wrote:
> >> > When does 2.3 come out - sounds like what I want, just I'm pretty sure
> >> > we can't update to something that isn't released yet or at least very
> >> > close to release
> >> >
> >>
> >> 2.2 was just recently released. I would think 2.3 is a couple of more
> >> months away.
> >>
> >> If you want supported and more often released version of Camel then I
> >> can only recommend taking a look at the FUSE versions.
> >>
> >>
> >> >
> >> > On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:
> >> >
> >> >> Hi
> >> >>
> >> >> Try with the new overhauled aggreagtor in 2.3
> >> >> http://camel.apache.org/aggregator2.html
> >> >>
> >> >> It works bette with completion trigger.
> >> >>
> >> >>
> >> >> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <an...@riftware.com> wrote:
> >> >> > Hi there - with Clause help I've been able to get most of the way to
> >> >> > where I need to be.   Right now I'm doing a proof of concept with string
> >> >> > payloads,however in the end the payload will be an object.   Here's what
> >> >> > I'm attempting
> >> >> >
> >> >> >
> >> >> > I have an incoming message that contains an identifier as well as (N)
> >> >> > things to do against it.   The (N) things can be done in parallel.    So
> >> >> > what we are doing is splitting based on the (N) things.     Here's where
> >> >> > it gets tricky.
> >> >> > - The first of the (N) things to report success should be sent on while
> >> >> > the rest of them should be aborted.   We should then forward the success
> >> >> > on immediately not waiting for timeouts
> >> >> > - Further, in the event that none of them report success we should
> >> >> > aggregate until all (N) things have reported failure and then forward
> >> >> > that single negative result onward.
> >> >> > - As the (N) things inherently have timeouts built into them it would be
> >> >> > nice if I didn't have to deal with batchTimeout for the aggregator.
> >> >> >
> >> >> >
> >> >> >
> >> >> > What I'm seeing now with my prototype is that I can successfully spit
> >> >> > and process the split things using a threadPoolExecutor.   I provided
> >> >> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
> >> >> >
> >> >> >
> >> >> > Assume each of the split items have a built-in timeout on their work
> >> >> > effort of 5 seconds
> >> >> > With that result and without a .batchTimeout(7000L)   I was seeing 2
> >> >> > results from aggregate,   - 1 almost immediately for the successful
> >> >> > result and then a second aggregated message that had all the falures
> >> >> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
> >> >> > the .aggregate clause though I got 1 single message that had the success
> >> >> > and the failures all in one.      This is close, however I guess what
> >> >> > I'm asking is how can I control from inside the aggregation the decision
> >> >> > to move forward?    In the splitter I'm already planning on including in
> >> >> > each split object a sharedobject that can be used to abort any of the
> >> >> > sibling split objects so I trhink I have a handle on that.
> >> >> > Basically the reason I need the aggregate mechanism to control the
> >> >> > continuing on part of the process is that if we're going after say
> >> >> > 60,000 things then the ability to start work on the successful ones
> >> >> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
> >> >> > is significant.    But I still have to account for a totally negative
> >> >> > response in the event none of them are successful.
> >> >> >
> >> >> > I'm presently looking at creating my own AggregationCollection as it
> >> >> > seemed to allow me to figure out size of the aggregated collection and I
> >> >> > can somehow figure out the total number of items split versus how many
> >> >> > have been aggregated to determine I'm done.   (I thought that info was
> >> >> > supposed to be in the header somewhere but it doesn't seem to be there)
> >> >> >
> >> >> >
> >> >> > Any insights or redirects are appreciated.
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >
> >> >
> >> >
> >>
> >>
> >>
> >
> >
> >
> 
> 
> 



Re: Best Strategy - aggregation

Posted by Andrew Chandler <an...@riftware.com>.
The map is an member of the instance of the second splitter - our
understanding is that there is one splitter per instance of the route so
there really is just the one copy of the map in our circumstance.....if
the splitter was useful for more than one route of course there would be
more than one map (one per copy of the splitter) - The only hole in that
theory is if more than one instance of the splitter bean is invoked for
a single route.


On Tue, 2010-03-02 at 16:38 +0100, Claus Ibsen wrote:

> On Tue, Mar 2, 2010 at 4:09 PM, Andrew Chandler <an...@riftware.com> wrote:
> > We wound up resolving this by basically totally subverting a secondary
> > splitter - we split the first time to do the parallel work and then
> > route to a second custom splitter that maintains an internal map of
> > result objects - the map is based on the correlation key and the object
> > includes properties for totalsplit count for that correlation key,
> > results received so far, and isForwarded.     Basically the first time a
> > correlation comes in a result is created into the map for the
> > correlationid.   If any of the results indicate success we set the
> > isForwarded to true and return a message in our list of results
> > indicating the success, however we don't remove the correlation id from
> > the map until ALL responses are received.   It's not as pretty as an
> > aggregator but it does seem to work for us.    We've had to do that
> > elsewhere as well though and considering we seem to be using memory at a
> > staggering rate my worry is that we're passing around too many maps in
> > messages that are duplicated when split etc.    Time to pull out the
> > profiler I guess :)
> >
> 
> Thanks for sharing your solution.
> 
> Where do you store those Maps? I would assume the copy is cheap as its
> just a reference copy, and not a deep copy.
> 
> 
> >
> >
> > On Tue, 2010-03-02 at 07:03 +0100, Claus Ibsen wrote:
> >
> >> On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <an...@riftware.com> wrote:
> >> > When does 2.3 come out - sounds like what I want, just I'm pretty sure
> >> > we can't update to something that isn't released yet or at least very
> >> > close to release
> >> >
> >>
> >> 2.2 was just recently released. I would think 2.3 is a couple of more
> >> months away.
> >>
> >> If you want supported and more often released version of Camel then I
> >> can only recommend taking a look at the FUSE versions.
> >>
> >>
> >> >
> >> > On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:
> >> >
> >> >> Hi
> >> >>
> >> >> Try with the new overhauled aggreagtor in 2.3
> >> >> http://camel.apache.org/aggregator2.html
> >> >>
> >> >> It works bette with completion trigger.
> >> >>
> >> >>
> >> >> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <an...@riftware.com> wrote:
> >> >> > Hi there - with Clause help I've been able to get most of the way to
> >> >> > where I need to be.   Right now I'm doing a proof of concept with string
> >> >> > payloads,however in the end the payload will be an object.   Here's what
> >> >> > I'm attempting
> >> >> >
> >> >> >
> >> >> > I have an incoming message that contains an identifier as well as (N)
> >> >> > things to do against it.   The (N) things can be done in parallel.    So
> >> >> > what we are doing is splitting based on the (N) things.     Here's where
> >> >> > it gets tricky.
> >> >> > - The first of the (N) things to report success should be sent on while
> >> >> > the rest of them should be aborted.   We should then forward the success
> >> >> > on immediately not waiting for timeouts
> >> >> > - Further, in the event that none of them report success we should
> >> >> > aggregate until all (N) things have reported failure and then forward
> >> >> > that single negative result onward.
> >> >> > - As the (N) things inherently have timeouts built into them it would be
> >> >> > nice if I didn't have to deal with batchTimeout for the aggregator.
> >> >> >
> >> >> >
> >> >> >
> >> >> > What I'm seeing now with my prototype is that I can successfully spit
> >> >> > and process the split things using a threadPoolExecutor.   I provided
> >> >> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
> >> >> >
> >> >> >
> >> >> > Assume each of the split items have a built-in timeout on their work
> >> >> > effort of 5 seconds
> >> >> > With that result and without a .batchTimeout(7000L)   I was seeing 2
> >> >> > results from aggregate,   - 1 almost immediately for the successful
> >> >> > result and then a second aggregated message that had all the falures
> >> >> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
> >> >> > the .aggregate clause though I got 1 single message that had the success
> >> >> > and the failures all in one.      This is close, however I guess what
> >> >> > I'm asking is how can I control from inside the aggregation the decision
> >> >> > to move forward?    In the splitter I'm already planning on including in
> >> >> > each split object a sharedobject that can be used to abort any of the
> >> >> > sibling split objects so I trhink I have a handle on that.
> >> >> > Basically the reason I need the aggregate mechanism to control the
> >> >> > continuing on part of the process is that if we're going after say
> >> >> > 60,000 things then the ability to start work on the successful ones
> >> >> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
> >> >> > is significant.    But I still have to account for a totally negative
> >> >> > response in the event none of them are successful.
> >> >> >
> >> >> > I'm presently looking at creating my own AggregationCollection as it
> >> >> > seemed to allow me to figure out size of the aggregated collection and I
> >> >> > can somehow figure out the total number of items split versus how many
> >> >> > have been aggregated to determine I'm done.   (I thought that info was
> >> >> > supposed to be in the header somewhere but it doesn't seem to be there)
> >> >> >
> >> >> >
> >> >> > Any insights or redirects are appreciated.
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >
> >> >
> >> >
> >>
> >>
> >>
> >
> >
> >
> 
> 
> 



Re: Best Strategy - aggregation

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Mar 2, 2010 at 4:09 PM, Andrew Chandler <an...@riftware.com> wrote:
> We wound up resolving this by basically totally subverting a secondary
> splitter - we split the first time to do the parallel work and then
> route to a second custom splitter that maintains an internal map of
> result objects - the map is based on the correlation key and the object
> includes properties for totalsplit count for that correlation key,
> results received so far, and isForwarded.     Basically the first time a
> correlation comes in a result is created into the map for the
> correlationid.   If any of the results indicate success we set the
> isForwarded to true and return a message in our list of results
> indicating the success, however we don't remove the correlation id from
> the map until ALL responses are received.   It's not as pretty as an
> aggregator but it does seem to work for us.    We've had to do that
> elsewhere as well though and considering we seem to be using memory at a
> staggering rate my worry is that we're passing around too many maps in
> messages that are duplicated when split etc.    Time to pull out the
> profiler I guess :)
>

Thanks for sharing your solution.

Where do you store those Maps? I would assume the copy is cheap as its
just a reference copy, and not a deep copy.


>
>
> On Tue, 2010-03-02 at 07:03 +0100, Claus Ibsen wrote:
>
>> On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <an...@riftware.com> wrote:
>> > When does 2.3 come out - sounds like what I want, just I'm pretty sure
>> > we can't update to something that isn't released yet or at least very
>> > close to release
>> >
>>
>> 2.2 was just recently released. I would think 2.3 is a couple of more
>> months away.
>>
>> If you want supported and more often released version of Camel then I
>> can only recommend taking a look at the FUSE versions.
>>
>>
>> >
>> > On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:
>> >
>> >> Hi
>> >>
>> >> Try with the new overhauled aggreagtor in 2.3
>> >> http://camel.apache.org/aggregator2.html
>> >>
>> >> It works bette with completion trigger.
>> >>
>> >>
>> >> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <an...@riftware.com> wrote:
>> >> > Hi there - with Clause help I've been able to get most of the way to
>> >> > where I need to be.   Right now I'm doing a proof of concept with string
>> >> > payloads,however in the end the payload will be an object.   Here's what
>> >> > I'm attempting
>> >> >
>> >> >
>> >> > I have an incoming message that contains an identifier as well as (N)
>> >> > things to do against it.   The (N) things can be done in parallel.    So
>> >> > what we are doing is splitting based on the (N) things.     Here's where
>> >> > it gets tricky.
>> >> > - The first of the (N) things to report success should be sent on while
>> >> > the rest of them should be aborted.   We should then forward the success
>> >> > on immediately not waiting for timeouts
>> >> > - Further, in the event that none of them report success we should
>> >> > aggregate until all (N) things have reported failure and then forward
>> >> > that single negative result onward.
>> >> > - As the (N) things inherently have timeouts built into them it would be
>> >> > nice if I didn't have to deal with batchTimeout for the aggregator.
>> >> >
>> >> >
>> >> >
>> >> > What I'm seeing now with my prototype is that I can successfully spit
>> >> > and process the split things using a threadPoolExecutor.   I provided
>> >> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
>> >> >
>> >> >
>> >> > Assume each of the split items have a built-in timeout on their work
>> >> > effort of 5 seconds
>> >> > With that result and without a .batchTimeout(7000L)   I was seeing 2
>> >> > results from aggregate,   - 1 almost immediately for the successful
>> >> > result and then a second aggregated message that had all the falures
>> >> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
>> >> > the .aggregate clause though I got 1 single message that had the success
>> >> > and the failures all in one.      This is close, however I guess what
>> >> > I'm asking is how can I control from inside the aggregation the decision
>> >> > to move forward?    In the splitter I'm already planning on including in
>> >> > each split object a sharedobject that can be used to abort any of the
>> >> > sibling split objects so I trhink I have a handle on that.
>> >> > Basically the reason I need the aggregate mechanism to control the
>> >> > continuing on part of the process is that if we're going after say
>> >> > 60,000 things then the ability to start work on the successful ones
>> >> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
>> >> > is significant.    But I still have to account for a totally negative
>> >> > response in the event none of them are successful.
>> >> >
>> >> > I'm presently looking at creating my own AggregationCollection as it
>> >> > seemed to allow me to figure out size of the aggregated collection and I
>> >> > can somehow figure out the total number of items split versus how many
>> >> > have been aggregated to determine I'm done.   (I thought that info was
>> >> > supposed to be in the header somewhere but it doesn't seem to be there)
>> >> >
>> >> >
>> >> > Any insights or redirects are appreciated.
>> >> >
>> >>
>> >>
>> >>
>> >
>> >
>> >
>>
>>
>>
>
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Best Strategy - aggregation

Posted by Andrew Chandler <an...@riftware.com>.
We wound up resolving this by basically totally subverting a secondary
splitter - we split the first time to do the parallel work and then
route to a second custom splitter that maintains an internal map of
result objects - the map is based on the correlation key and the object
includes properties for totalsplit count for that correlation key,
results received so far, and isForwarded.     Basically the first time a
correlation comes in a result is created into the map for the
correlationid.   If any of the results indicate success we set the
isForwarded to true and return a message in our list of results
indicating the success, however we don't remove the correlation id from
the map until ALL responses are received.   It's not as pretty as an
aggregator but it does seem to work for us.    We've had to do that
elsewhere as well though and considering we seem to be using memory at a
staggering rate my worry is that we're passing around too many maps in
messages that are duplicated when split etc.    Time to pull out the
profiler I guess :)



On Tue, 2010-03-02 at 07:03 +0100, Claus Ibsen wrote:

> On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <an...@riftware.com> wrote:
> > When does 2.3 come out - sounds like what I want, just I'm pretty sure
> > we can't update to something that isn't released yet or at least very
> > close to release
> >
> 
> 2.2 was just recently released. I would think 2.3 is a couple of more
> months away.
> 
> If you want supported and more often released version of Camel then I
> can only recommend taking a look at the FUSE versions.
> 
> 
> >
> > On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:
> >
> >> Hi
> >>
> >> Try with the new overhauled aggreagtor in 2.3
> >> http://camel.apache.org/aggregator2.html
> >>
> >> It works bette with completion trigger.
> >>
> >>
> >> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <an...@riftware.com> wrote:
> >> > Hi there - with Clause help I've been able to get most of the way to
> >> > where I need to be.   Right now I'm doing a proof of concept with string
> >> > payloads,however in the end the payload will be an object.   Here's what
> >> > I'm attempting
> >> >
> >> >
> >> > I have an incoming message that contains an identifier as well as (N)
> >> > things to do against it.   The (N) things can be done in parallel.    So
> >> > what we are doing is splitting based on the (N) things.     Here's where
> >> > it gets tricky.
> >> > - The first of the (N) things to report success should be sent on while
> >> > the rest of them should be aborted.   We should then forward the success
> >> > on immediately not waiting for timeouts
> >> > - Further, in the event that none of them report success we should
> >> > aggregate until all (N) things have reported failure and then forward
> >> > that single negative result onward.
> >> > - As the (N) things inherently have timeouts built into them it would be
> >> > nice if I didn't have to deal with batchTimeout for the aggregator.
> >> >
> >> >
> >> >
> >> > What I'm seeing now with my prototype is that I can successfully spit
> >> > and process the split things using a threadPoolExecutor.   I provided
> >> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
> >> >
> >> >
> >> > Assume each of the split items have a built-in timeout on their work
> >> > effort of 5 seconds
> >> > With that result and without a .batchTimeout(7000L)   I was seeing 2
> >> > results from aggregate,   - 1 almost immediately for the successful
> >> > result and then a second aggregated message that had all the falures
> >> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
> >> > the .aggregate clause though I got 1 single message that had the success
> >> > and the failures all in one.      This is close, however I guess what
> >> > I'm asking is how can I control from inside the aggregation the decision
> >> > to move forward?    In the splitter I'm already planning on including in
> >> > each split object a sharedobject that can be used to abort any of the
> >> > sibling split objects so I trhink I have a handle on that.
> >> > Basically the reason I need the aggregate mechanism to control the
> >> > continuing on part of the process is that if we're going after say
> >> > 60,000 things then the ability to start work on the successful ones
> >> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
> >> > is significant.    But I still have to account for a totally negative
> >> > response in the event none of them are successful.
> >> >
> >> > I'm presently looking at creating my own AggregationCollection as it
> >> > seemed to allow me to figure out size of the aggregated collection and I
> >> > can somehow figure out the total number of items split versus how many
> >> > have been aggregated to determine I'm done.   (I thought that info was
> >> > supposed to be in the header somewhere but it doesn't seem to be there)
> >> >
> >> >
> >> > Any insights or redirects are appreciated.
> >> >
> >>
> >>
> >>
> >
> >
> >
> 
> 
> 



Re: Best Strategy - aggregation

Posted by Andrew Chandler <an...@riftware.com>.
Thanks Clause
On Tue, 2010-03-02 at 07:03 +0100, Claus Ibsen wrote:

> On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <an...@riftware.com> wrote:
> > When does 2.3 come out - sounds like what I want, just I'm pretty sure
> > we can't update to something that isn't released yet or at least very
> > close to release
> >
> 
> 2.2 was just recently released. I would think 2.3 is a couple of more
> months away.
> 
> If you want supported and more often released version of Camel then I
> can only recommend taking a look at the FUSE versions.
> 
> 
> >
> > On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:
> >
> >> Hi
> >>
> >> Try with the new overhauled aggreagtor in 2.3
> >> http://camel.apache.org/aggregator2.html
> >>
> >> It works bette with completion trigger.
> >>
> >>
> >> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <an...@riftware.com> wrote:
> >> > Hi there - with Clause help I've been able to get most of the way to
> >> > where I need to be.   Right now I'm doing a proof of concept with string
> >> > payloads,however in the end the payload will be an object.   Here's what
> >> > I'm attempting
> >> >
> >> >
> >> > I have an incoming message that contains an identifier as well as (N)
> >> > things to do against it.   The (N) things can be done in parallel.    So
> >> > what we are doing is splitting based on the (N) things.     Here's where
> >> > it gets tricky.
> >> > - The first of the (N) things to report success should be sent on while
> >> > the rest of them should be aborted.   We should then forward the success
> >> > on immediately not waiting for timeouts
> >> > - Further, in the event that none of them report success we should
> >> > aggregate until all (N) things have reported failure and then forward
> >> > that single negative result onward.
> >> > - As the (N) things inherently have timeouts built into them it would be
> >> > nice if I didn't have to deal with batchTimeout for the aggregator.
> >> >
> >> >
> >> >
> >> > What I'm seeing now with my prototype is that I can successfully spit
> >> > and process the split things using a threadPoolExecutor.   I provided
> >> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
> >> >
> >> >
> >> > Assume each of the split items have a built-in timeout on their work
> >> > effort of 5 seconds
> >> > With that result and without a .batchTimeout(7000L)   I was seeing 2
> >> > results from aggregate,   - 1 almost immediately for the successful
> >> > result and then a second aggregated message that had all the falures
> >> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
> >> > the .aggregate clause though I got 1 single message that had the success
> >> > and the failures all in one.      This is close, however I guess what
> >> > I'm asking is how can I control from inside the aggregation the decision
> >> > to move forward?    In the splitter I'm already planning on including in
> >> > each split object a sharedobject that can be used to abort any of the
> >> > sibling split objects so I trhink I have a handle on that.
> >> > Basically the reason I need the aggregate mechanism to control the
> >> > continuing on part of the process is that if we're going after say
> >> > 60,000 things then the ability to start work on the successful ones
> >> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
> >> > is significant.    But I still have to account for a totally negative
> >> > response in the event none of them are successful.
> >> >
> >> > I'm presently looking at creating my own AggregationCollection as it
> >> > seemed to allow me to figure out size of the aggregated collection and I
> >> > can somehow figure out the total number of items split versus how many
> >> > have been aggregated to determine I'm done.   (I thought that info was
> >> > supposed to be in the header somewhere but it doesn't seem to be there)
> >> >
> >> >
> >> > Any insights or redirects are appreciated.
> >> >
> >>
> >>
> >>
> >
> >
> >
> 
> 
> 



Re: Best Strategy - aggregation

Posted by Claus Ibsen <cl...@gmail.com>.
On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <an...@riftware.com> wrote:
> When does 2.3 come out - sounds like what I want, just I'm pretty sure
> we can't update to something that isn't released yet or at least very
> close to release
>

2.2 was just recently released. I would think 2.3 is a couple of more
months away.

If you want supported and more often released version of Camel then I
can only recommend taking a look at the FUSE versions.


>
> On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:
>
>> Hi
>>
>> Try with the new overhauled aggreagtor in 2.3
>> http://camel.apache.org/aggregator2.html
>>
>> It works bette with completion trigger.
>>
>>
>> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <an...@riftware.com> wrote:
>> > Hi there - with Clause help I've been able to get most of the way to
>> > where I need to be.   Right now I'm doing a proof of concept with string
>> > payloads,however in the end the payload will be an object.   Here's what
>> > I'm attempting
>> >
>> >
>> > I have an incoming message that contains an identifier as well as (N)
>> > things to do against it.   The (N) things can be done in parallel.    So
>> > what we are doing is splitting based on the (N) things.     Here's where
>> > it gets tricky.
>> > - The first of the (N) things to report success should be sent on while
>> > the rest of them should be aborted.   We should then forward the success
>> > on immediately not waiting for timeouts
>> > - Further, in the event that none of them report success we should
>> > aggregate until all (N) things have reported failure and then forward
>> > that single negative result onward.
>> > - As the (N) things inherently have timeouts built into them it would be
>> > nice if I didn't have to deal with batchTimeout for the aggregator.
>> >
>> >
>> >
>> > What I'm seeing now with my prototype is that I can successfully spit
>> > and process the split things using a threadPoolExecutor.   I provided
>> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
>> >
>> >
>> > Assume each of the split items have a built-in timeout on their work
>> > effort of 5 seconds
>> > With that result and without a .batchTimeout(7000L)   I was seeing 2
>> > results from aggregate,   - 1 almost immediately for the successful
>> > result and then a second aggregated message that had all the falures
>> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
>> > the .aggregate clause though I got 1 single message that had the success
>> > and the failures all in one.      This is close, however I guess what
>> > I'm asking is how can I control from inside the aggregation the decision
>> > to move forward?    In the splitter I'm already planning on including in
>> > each split object a sharedobject that can be used to abort any of the
>> > sibling split objects so I trhink I have a handle on that.
>> > Basically the reason I need the aggregate mechanism to control the
>> > continuing on part of the process is that if we're going after say
>> > 60,000 things then the ability to start work on the successful ones
>> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
>> > is significant.    But I still have to account for a totally negative
>> > response in the event none of them are successful.
>> >
>> > I'm presently looking at creating my own AggregationCollection as it
>> > seemed to allow me to figure out size of the aggregated collection and I
>> > can somehow figure out the total number of items split versus how many
>> > have been aggregated to determine I'm done.   (I thought that info was
>> > supposed to be in the header somewhere but it doesn't seem to be there)
>> >
>> >
>> > Any insights or redirects are appreciated.
>> >
>>
>>
>>
>
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Best Strategy - aggregation

Posted by Andrew Chandler <an...@riftware.com>.
When does 2.3 come out - sounds like what I want, just I'm pretty sure
we can't update to something that isn't released yet or at least very
close to release


On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:

> Hi
> 
> Try with the new overhauled aggreagtor in 2.3
> http://camel.apache.org/aggregator2.html
> 
> It works bette with completion trigger.
> 
> 
> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <an...@riftware.com> wrote:
> > Hi there - with Clause help I've been able to get most of the way to
> > where I need to be.   Right now I'm doing a proof of concept with string
> > payloads,however in the end the payload will be an object.   Here's what
> > I'm attempting
> >
> >
> > I have an incoming message that contains an identifier as well as (N)
> > things to do against it.   The (N) things can be done in parallel.    So
> > what we are doing is splitting based on the (N) things.     Here's where
> > it gets tricky.
> > - The first of the (N) things to report success should be sent on while
> > the rest of them should be aborted.   We should then forward the success
> > on immediately not waiting for timeouts
> > - Further, in the event that none of them report success we should
> > aggregate until all (N) things have reported failure and then forward
> > that single negative result onward.
> > - As the (N) things inherently have timeouts built into them it would be
> > nice if I didn't have to deal with batchTimeout for the aggregator.
> >
> >
> >
> > What I'm seeing now with my prototype is that I can successfully spit
> > and process the split things using a threadPoolExecutor.   I provided
> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
> >
> >
> > Assume each of the split items have a built-in timeout on their work
> > effort of 5 seconds
> > With that result and without a .batchTimeout(7000L)   I was seeing 2
> > results from aggregate,   - 1 almost immediately for the successful
> > result and then a second aggregated message that had all the falures
> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
> > the .aggregate clause though I got 1 single message that had the success
> > and the failures all in one.      This is close, however I guess what
> > I'm asking is how can I control from inside the aggregation the decision
> > to move forward?    In the splitter I'm already planning on including in
> > each split object a sharedobject that can be used to abort any of the
> > sibling split objects so I trhink I have a handle on that.
> > Basically the reason I need the aggregate mechanism to control the
> > continuing on part of the process is that if we're going after say
> > 60,000 things then the ability to start work on the successful ones
> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
> > is significant.    But I still have to account for a totally negative
> > response in the event none of them are successful.
> >
> > I'm presently looking at creating my own AggregationCollection as it
> > seemed to allow me to figure out size of the aggregated collection and I
> > can somehow figure out the total number of items split versus how many
> > have been aggregated to determine I'm done.   (I thought that info was
> > supposed to be in the header somewhere but it doesn't seem to be there)
> >
> >
> > Any insights or redirects are appreciated.
> >
> 
> 
> 



Re: Best Strategy - aggregation

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Try with the new overhauled aggreagtor in 2.3
http://camel.apache.org/aggregator2.html

It works bette with completion trigger.


On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <an...@riftware.com> wrote:
> Hi there - with Clause help I've been able to get most of the way to
> where I need to be.   Right now I'm doing a proof of concept with string
> payloads,however in the end the payload will be an object.   Here's what
> I'm attempting
>
>
> I have an incoming message that contains an identifier as well as (N)
> things to do against it.   The (N) things can be done in parallel.    So
> what we are doing is splitting based on the (N) things.     Here's where
> it gets tricky.
> - The first of the (N) things to report success should be sent on while
> the rest of them should be aborted.   We should then forward the success
> on immediately not waiting for timeouts
> - Further, in the event that none of them report success we should
> aggregate until all (N) things have reported failure and then forward
> that single negative result onward.
> - As the (N) things inherently have timeouts built into them it would be
> nice if I didn't have to deal with batchTimeout for the aggregator.
>
>
>
> What I'm seeing now with my prototype is that I can successfully spit
> and process the split things using a threadPoolExecutor.   I provided
> to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
>
>
> Assume each of the split items have a built-in timeout on their work
> effort of 5 seconds
> With that result and without a .batchTimeout(7000L)   I was seeing 2
> results from aggregate,   - 1 almost immediately for the successful
> result and then a second aggregated message that had all the falures
> about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
> the .aggregate clause though I got 1 single message that had the success
> and the failures all in one.      This is close, however I guess what
> I'm asking is how can I control from inside the aggregation the decision
> to move forward?    In the splitter I'm already planning on including in
> each split object a sharedobject that can be used to abort any of the
> sibling split objects so I trhink I have a handle on that.
> Basically the reason I need the aggregate mechanism to control the
> continuing on part of the process is that if we're going after say
> 60,000 things then the ability to start work on the successful ones
> after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
> is significant.    But I still have to account for a totally negative
> response in the event none of them are successful.
>
> I'm presently looking at creating my own AggregationCollection as it
> seemed to allow me to figure out size of the aggregated collection and I
> can somehow figure out the total number of items split versus how many
> have been aggregated to determine I'm done.   (I thought that info was
> supposed to be in the header somewhere but it doesn't seem to be there)
>
>
> Any insights or redirects are appreciated.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus