You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "Tom.Fornoville" <to...@roots.be> on 2013/10/22 09:32:50 UTC

Grouping messages for batch processing

Hi everyone,

I have the following use-case:
* messages arrive on a queue, 1 message for each organisation be
created/updated, messages that belong to the same transaction (= our own
term, NOT a JPA transaction or something like that) all have the same
transactionGUID

* we need to group organisations together in batches of 50 to process them,
the last batch can be smaller

* during processing extra entities can be added to the queue: updates for an
enterprise can trigger updates for establishments under that enterprise

* after processing by the bean we need to update the transaction entity with
statistics (# updated, # created etc...)

I was thinking of doing this with 2 aggregators like this:

<route id="process-organisations">
	<from uri="activemq:organisation.update.queue" />
	<aggregate strategyRef="organisationAggregator" completionSize="50"
completionTimeout="5000">
		<correlationExpression>
			<header>transactionGUID</header>
		</correlationExpression>
		<bean ref="organisationServiceActivator" method="updateOrganisations" />
		<to uri="activemq:organisation.processed.queue" />
	</aggregate>
</route>
        
<route id="aggregate-organisation-results">
	<from uri="activemq:organisation.processed.queue" />
	<aggregate strategyRef="?">
		<correlationExpression>
			<header>?</header>
		</correlationExpression>
		<completionPredicate>
			<simple>?</simple>
		</completionPredicate>
		<bean ref="transactionServiceActivator" method="updateTransaction" />
	</aggregate>
</route>

The questions I have:
* are 2 routes with aggregators the best way to handle this use case?
* will the first aggregate keep working or will it stop after the first
batch of 50?
* what should be the correlation and completion paramters for the second
aggregation?

Thanks in advance,
Tom




--
View this message in context: http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Grouping messages for batch processing

Posted by Tom Fornoville <to...@roots.be>.
Hi Claus,

On the way home I realized that I should put the updateTransaction in the
aggregate and not after...dumb copy/paste error.
Thanks for the tip about the property for the splitter.

Learning Camel and EIP one pattern at a time ;-)

Tom Fornoville
Senior Developer
m: +32 478 65 86 51
www.roots.be


On Tue, Oct 22, 2013 at 6:05 PM, Claus Ibsen <cl...@gmail.com> wrote:

> You should have the <choice> inside the <aggregate> so all the logic
> is executed as part of when the aggregator completes.
>
> And btw the splitter has a property it sets on the last message when
> its complete. You can transfer that as a header over JMS
>
> <from uri="file:src/data?noop=true" />
>   <setHeader headerName="transactionGUID">
>     <simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple>
>   </setHeader>
>   <split>
>     <tokenize token="organisation" xml="true" />
>     <setHeader headerName="lastMessage">
> <property>Exchange.SPLIT_COMPLETE</property>
>     </setHeader>
>   <to uri="activemq:organisation.queue" />
> </split>
>
> </route>
>
>
> On Tue, Oct 22, 2013 at 4:48 PM, Tom Fornoville <to...@roots.be>
> wrote:
> > thanks Claus and Erwin for the answers, I'm starting to understand the
> > patterns a little bit better.
> >
> > I am however still not sure how to handle the use-case with only 1
> > aggregator.
> > As Claus suggested I'm now sending an extra message after all the
> > organisations are split.
> >
> > The routes now look something like this:
> >
> > <route>
> > <from uri="file:src/data?noop=true" />
> > <setHeader headerName="transactionGUID">
> > <simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple>
> > </setHeader>
> > <split>
> > <tokenize token="organisation" xml="true" />
> > <to uri="activemq:organisation.queue" />
> > </split>
> > <setBody>
> > <simple>LAST_LINE</simple>
> > </setBody>
> > <to uri="activemq:organisation.queue" />
> > </route>
> >
> > <route id="process-organisations">
> > <from uri="activemq:organisation.queue" />
> > <aggregate strategyRef="organisationAggregator" completionSize="50"
> > completionTimeout="5000">
> > <correlationExpression>
> > <header>transactionGUID</header>
> > </correlationExpression>
> > <bean ref="organisationServiceActivator" method="updateOrganisations" />
> > </aggregate>
> > <choice>
> > <when>
> > <simple>${body} == 'LAST_LINE'</simple>
> > <bean ref="transactionServiceActivator" method="updateTransaction" />
> > </when>
> > </choice>
> > </route>
> >
> > The only problem now is that my
> > transactionServiceActivator.updateTransaction is called before the last
> > batch of organisations is updated when the organisations are not an exact
> > multiple of 50.
> > For example starting with a file that has 120 organisations I get the
> > following:
> >  * update organisations 1-50
> >  * update organisations  51-100
> >  * update transaction
> >  * update organisations  101-120
> >
> > Tom Fornoville
> > Senior Developer
> > m: +32 478 65 86 51
> > www.roots.be
> >
> >
> > On Tue, Oct 22, 2013 at 3:01 PM, Erwin Etchart <erwin.etchart@gmail.com
> >wrote:
> >
> >> Just for help remember (is a mistake that i made) that every aggregate
> >> needs its own repository(i loose a few hours with this).
> >>
> >> Regards.
> >>
> >>
> >> 2013/10/22 Tom.Fornoville <to...@roots.be>
> >>
> >> > Hi everyone,
> >> >
> >> > I have the following use-case:
> >> > * messages arrive on a queue, 1 message for each organisation be
> >> > created/updated, messages that belong to the same transaction (= our
> own
> >> > term, NOT a JPA transaction or something like that) all have the same
> >> > transactionGUID
> >> >
> >> > * we need to group organisations together in batches of 50 to process
> >> them,
> >> > the last batch can be smaller
> >> >
> >> > * during processing extra entities can be added to the queue: updates
> for
> >> > an
> >> > enterprise can trigger updates for establishments under that
> enterprise
> >> >
> >> > * after processing by the bean we need to update the transaction
> entity
> >> > with
> >> > statistics (# updated, # created etc...)
> >> >
> >> > I was thinking of doing this with 2 aggregators like this:
> >> >
> >> > <route id="process-organisations">
> >> >         <from uri="activemq:organisation.update.queue" />
> >> >         <aggregate strategyRef="organisationAggregator"
> >> completionSize="50"
> >> > completionTimeout="5000">
> >> >                 <correlationExpression>
> >> >                         <header>transactionGUID</header>
> >> >                 </correlationExpression>
> >> >                 <bean ref="organisationServiceActivator"
> >> > method="updateOrganisations" />
> >> >                 <to uri="activemq:organisation.processed.queue" />
> >> >         </aggregate>
> >> > </route>
> >> >
> >> > <route id="aggregate-organisation-results">
> >> >         <from uri="activemq:organisation.processed.queue" />
> >> >         <aggregate strategyRef="?">
> >> >                 <correlationExpression>
> >> >                         <header>?</header>
> >> >                 </correlationExpression>
> >> >                 <completionPredicate>
> >> >                         <simple>?</simple>
> >> >                 </completionPredicate>
> >> >                 <bean ref="transactionServiceActivator"
> >> > method="updateTransaction" />
> >> >         </aggregate>
> >> > </route>
> >> >
> >> > The questions I have:
> >> > * are 2 routes with aggregators the best way to handle this use case?
> >> > * will the first aggregate keep working or will it stop after the
> first
> >> > batch of 50?
> >> > * what should be the correlation and completion paramters for the
> second
> >> > aggregation?
> >> >
> >> > Thanks in advance,
> >> > Tom
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> >
> >>
> http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html
> >> > Sent from the Camel - Users mailing list archive at Nabble.com.
> >> >
> >>
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
>

Re: Grouping messages for batch processing

Posted by Claus Ibsen <cl...@gmail.com>.
You should have the <choice> inside the <aggregate> so all the logic
is executed as part of when the aggregator completes.

And btw the splitter has a property it sets on the last message when
its complete. You can transfer that as a header over JMS

<from uri="file:src/data?noop=true" />
  <setHeader headerName="transactionGUID">
    <simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple>
  </setHeader>
  <split>
    <tokenize token="organisation" xml="true" />
    <setHeader headerName="lastMessage">
<property>Exchange.SPLIT_COMPLETE</property>
    </setHeader>
  <to uri="activemq:organisation.queue" />
</split>

</route>


On Tue, Oct 22, 2013 at 4:48 PM, Tom Fornoville <to...@roots.be> wrote:
> thanks Claus and Erwin for the answers, I'm starting to understand the
> patterns a little bit better.
>
> I am however still not sure how to handle the use-case with only 1
> aggregator.
> As Claus suggested I'm now sending an extra message after all the
> organisations are split.
>
> The routes now look something like this:
>
> <route>
> <from uri="file:src/data?noop=true" />
> <setHeader headerName="transactionGUID">
> <simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple>
> </setHeader>
> <split>
> <tokenize token="organisation" xml="true" />
> <to uri="activemq:organisation.queue" />
> </split>
> <setBody>
> <simple>LAST_LINE</simple>
> </setBody>
> <to uri="activemq:organisation.queue" />
> </route>
>
> <route id="process-organisations">
> <from uri="activemq:organisation.queue" />
> <aggregate strategyRef="organisationAggregator" completionSize="50"
> completionTimeout="5000">
> <correlationExpression>
> <header>transactionGUID</header>
> </correlationExpression>
> <bean ref="organisationServiceActivator" method="updateOrganisations" />
> </aggregate>
> <choice>
> <when>
> <simple>${body} == 'LAST_LINE'</simple>
> <bean ref="transactionServiceActivator" method="updateTransaction" />
> </when>
> </choice>
> </route>
>
> The only problem now is that my
> transactionServiceActivator.updateTransaction is called before the last
> batch of organisations is updated when the organisations are not an exact
> multiple of 50.
> For example starting with a file that has 120 organisations I get the
> following:
>  * update organisations 1-50
>  * update organisations  51-100
>  * update transaction
>  * update organisations  101-120
>
> Tom Fornoville
> Senior Developer
> m: +32 478 65 86 51
> www.roots.be
>
>
> On Tue, Oct 22, 2013 at 3:01 PM, Erwin Etchart <er...@gmail.com>wrote:
>
>> Just for help remember (is a mistake that i made) that every aggregate
>> needs its own repository(i loose a few hours with this).
>>
>> Regards.
>>
>>
>> 2013/10/22 Tom.Fornoville <to...@roots.be>
>>
>> > Hi everyone,
>> >
>> > I have the following use-case:
>> > * messages arrive on a queue, 1 message for each organisation be
>> > created/updated, messages that belong to the same transaction (= our own
>> > term, NOT a JPA transaction or something like that) all have the same
>> > transactionGUID
>> >
>> > * we need to group organisations together in batches of 50 to process
>> them,
>> > the last batch can be smaller
>> >
>> > * during processing extra entities can be added to the queue: updates for
>> > an
>> > enterprise can trigger updates for establishments under that enterprise
>> >
>> > * after processing by the bean we need to update the transaction entity
>> > with
>> > statistics (# updated, # created etc...)
>> >
>> > I was thinking of doing this with 2 aggregators like this:
>> >
>> > <route id="process-organisations">
>> >         <from uri="activemq:organisation.update.queue" />
>> >         <aggregate strategyRef="organisationAggregator"
>> completionSize="50"
>> > completionTimeout="5000">
>> >                 <correlationExpression>
>> >                         <header>transactionGUID</header>
>> >                 </correlationExpression>
>> >                 <bean ref="organisationServiceActivator"
>> > method="updateOrganisations" />
>> >                 <to uri="activemq:organisation.processed.queue" />
>> >         </aggregate>
>> > </route>
>> >
>> > <route id="aggregate-organisation-results">
>> >         <from uri="activemq:organisation.processed.queue" />
>> >         <aggregate strategyRef="?">
>> >                 <correlationExpression>
>> >                         <header>?</header>
>> >                 </correlationExpression>
>> >                 <completionPredicate>
>> >                         <simple>?</simple>
>> >                 </completionPredicate>
>> >                 <bean ref="transactionServiceActivator"
>> > method="updateTransaction" />
>> >         </aggregate>
>> > </route>
>> >
>> > The questions I have:
>> > * are 2 routes with aggregators the best way to handle this use case?
>> > * will the first aggregate keep working or will it stop after the first
>> > batch of 50?
>> > * what should be the correlation and completion paramters for the second
>> > aggregation?
>> >
>> > Thanks in advance,
>> > Tom
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> >
>> http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html
>> > Sent from the Camel - Users mailing list archive at Nabble.com.
>> >
>>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Grouping messages for batch processing

Posted by Tom Fornoville <to...@roots.be>.
thanks Claus and Erwin for the answers, I'm starting to understand the
patterns a little bit better.

I am however still not sure how to handle the use-case with only 1
aggregator.
As Claus suggested I'm now sending an extra message after all the
organisations are split.

The routes now look something like this:

<route>
<from uri="file:src/data?noop=true" />
<setHeader headerName="transactionGUID">
<simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple>
</setHeader>
<split>
<tokenize token="organisation" xml="true" />
<to uri="activemq:organisation.queue" />
</split>
<setBody>
<simple>LAST_LINE</simple>
</setBody>
<to uri="activemq:organisation.queue" />
</route>

<route id="process-organisations">
<from uri="activemq:organisation.queue" />
<aggregate strategyRef="organisationAggregator" completionSize="50"
completionTimeout="5000">
<correlationExpression>
<header>transactionGUID</header>
</correlationExpression>
<bean ref="organisationServiceActivator" method="updateOrganisations" />
</aggregate>
<choice>
<when>
<simple>${body} == 'LAST_LINE'</simple>
<bean ref="transactionServiceActivator" method="updateTransaction" />
</when>
</choice>
</route>

The only problem now is that my
transactionServiceActivator.updateTransaction is called before the last
batch of organisations is updated when the organisations are not an exact
multiple of 50.
For example starting with a file that has 120 organisations I get the
following:
 * update organisations 1-50
 * update organisations  51-100
 * update transaction
 * update organisations  101-120

Tom Fornoville
Senior Developer
m: +32 478 65 86 51
www.roots.be


On Tue, Oct 22, 2013 at 3:01 PM, Erwin Etchart <er...@gmail.com>wrote:

> Just for help remember (is a mistake that i made) that every aggregate
> needs its own repository(i loose a few hours with this).
>
> Regards.
>
>
> 2013/10/22 Tom.Fornoville <to...@roots.be>
>
> > Hi everyone,
> >
> > I have the following use-case:
> > * messages arrive on a queue, 1 message for each organisation be
> > created/updated, messages that belong to the same transaction (= our own
> > term, NOT a JPA transaction or something like that) all have the same
> > transactionGUID
> >
> > * we need to group organisations together in batches of 50 to process
> them,
> > the last batch can be smaller
> >
> > * during processing extra entities can be added to the queue: updates for
> > an
> > enterprise can trigger updates for establishments under that enterprise
> >
> > * after processing by the bean we need to update the transaction entity
> > with
> > statistics (# updated, # created etc...)
> >
> > I was thinking of doing this with 2 aggregators like this:
> >
> > <route id="process-organisations">
> >         <from uri="activemq:organisation.update.queue" />
> >         <aggregate strategyRef="organisationAggregator"
> completionSize="50"
> > completionTimeout="5000">
> >                 <correlationExpression>
> >                         <header>transactionGUID</header>
> >                 </correlationExpression>
> >                 <bean ref="organisationServiceActivator"
> > method="updateOrganisations" />
> >                 <to uri="activemq:organisation.processed.queue" />
> >         </aggregate>
> > </route>
> >
> > <route id="aggregate-organisation-results">
> >         <from uri="activemq:organisation.processed.queue" />
> >         <aggregate strategyRef="?">
> >                 <correlationExpression>
> >                         <header>?</header>
> >                 </correlationExpression>
> >                 <completionPredicate>
> >                         <simple>?</simple>
> >                 </completionPredicate>
> >                 <bean ref="transactionServiceActivator"
> > method="updateTransaction" />
> >         </aggregate>
> > </route>
> >
> > The questions I have:
> > * are 2 routes with aggregators the best way to handle this use case?
> > * will the first aggregate keep working or will it stop after the first
> > batch of 50?
> > * what should be the correlation and completion paramters for the second
> > aggregation?
> >
> > Thanks in advance,
> > Tom
> >
> >
> >
> >
> > --
> > View this message in context:
> >
> http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html
> > Sent from the Camel - Users mailing list archive at Nabble.com.
> >
>

Re: Grouping messages for batch processing

Posted by Erwin Etchart <er...@gmail.com>.
Just for help remember (is a mistake that i made) that every aggregate
needs its own repository(i loose a few hours with this).

Regards.


2013/10/22 Tom.Fornoville <to...@roots.be>

> Hi everyone,
>
> I have the following use-case:
> * messages arrive on a queue, 1 message for each organisation be
> created/updated, messages that belong to the same transaction (= our own
> term, NOT a JPA transaction or something like that) all have the same
> transactionGUID
>
> * we need to group organisations together in batches of 50 to process them,
> the last batch can be smaller
>
> * during processing extra entities can be added to the queue: updates for
> an
> enterprise can trigger updates for establishments under that enterprise
>
> * after processing by the bean we need to update the transaction entity
> with
> statistics (# updated, # created etc...)
>
> I was thinking of doing this with 2 aggregators like this:
>
> <route id="process-organisations">
>         <from uri="activemq:organisation.update.queue" />
>         <aggregate strategyRef="organisationAggregator" completionSize="50"
> completionTimeout="5000">
>                 <correlationExpression>
>                         <header>transactionGUID</header>
>                 </correlationExpression>
>                 <bean ref="organisationServiceActivator"
> method="updateOrganisations" />
>                 <to uri="activemq:organisation.processed.queue" />
>         </aggregate>
> </route>
>
> <route id="aggregate-organisation-results">
>         <from uri="activemq:organisation.processed.queue" />
>         <aggregate strategyRef="?">
>                 <correlationExpression>
>                         <header>?</header>
>                 </correlationExpression>
>                 <completionPredicate>
>                         <simple>?</simple>
>                 </completionPredicate>
>                 <bean ref="transactionServiceActivator"
> method="updateTransaction" />
>         </aggregate>
> </route>
>
> The questions I have:
> * are 2 routes with aggregators the best way to handle this use case?
> * will the first aggregate keep working or will it stop after the first
> batch of 50?
> * what should be the correlation and completion paramters for the second
> aggregation?
>
> Thanks in advance,
> Tom
>
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Grouping messages for batch processing

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Oct 22, 2013 at 9:32 AM, Tom.Fornoville <to...@roots.be> wrote:
> Hi everyone,
>
> I have the following use-case:
> * messages arrive on a queue, 1 message for each organisation be
> created/updated, messages that belong to the same transaction (= our own
> term, NOT a JPA transaction or something like that) all have the same
> transactionGUID
>
> * we need to group organisations together in batches of 50 to process them,
> the last batch can be smaller
>
> * during processing extra entities can be added to the queue: updates for an
> enterprise can trigger updates for establishments under that enterprise
>
> * after processing by the bean we need to update the transaction entity with
> statistics (# updated, # created etc...)
>
> I was thinking of doing this with 2 aggregators like this:
>
> <route id="process-organisations">
>         <from uri="activemq:organisation.update.queue" />
>         <aggregate strategyRef="organisationAggregator" completionSize="50"
> completionTimeout="5000">
>                 <correlationExpression>
>                         <header>transactionGUID</header>
>                 </correlationExpression>
>                 <bean ref="organisationServiceActivator" method="updateOrganisations" />
>                 <to uri="activemq:organisation.processed.queue" />
>         </aggregate>
> </route>
>
> <route id="aggregate-organisation-results">
>         <from uri="activemq:organisation.processed.queue" />
>         <aggregate strategyRef="?">
>                 <correlationExpression>
>                         <header>?</header>
>                 </correlationExpression>
>                 <completionPredicate>
>                         <simple>?</simple>
>                 </completionPredicate>
>                 <bean ref="transactionServiceActivator" method="updateTransaction" />
>         </aggregate>
> </route>
>
> The questions I have:
> * are 2 routes with aggregators the best way to handle this use case?
> * will the first aggregate keep working or will it stop after the first
> batch of 50?

Yes it will keep working.

> * what should be the correlation and completion paramters for the second
> aggregation?
>

You would need somehow to know when a TX is done. I assume maybe one
of the messages has a "hey dude there are no more messages in this TX"
detail that you can use to know when its done.

Then maybe in the first route you can set a special header in your
organization strategy to indicate the TX is done.

Then you can check that header in the 2nd route and call the bean.

Though not sure if you really need 2 aggregators. Maybe one is enough?



> Thanks in advance,
> Tom
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen