You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Jonathan Schoreels <jo...@gmail.com> on 2017/05/03 15:53:53 UTC

Aggregations, Transactions & "Exchange Processed"

Hi,

I'm a bit confused with the mechanic behind aggregation, transaction & the
"processed" state of an exchange.

*tldr : go the the bold part at the bottom.*

Let me introduce a small scenario :

I have two consumers that poll two different jms queues. Those two
consumers are using a JDBCAggregationRepository as an
AggregationRepository. Let call them consumer1 & consumer2. When the
JDBCAggregationRepository receive the two message, a third is created,
resp. message1 (polled by consumer1), message2 (polled by consumer2),
message3 (send by the aggregation strategy)

Imagine the following events :
1. message1 is the first to be polled, the AggregStrategy makes a little
bit of mapping and stores it in the repository.
2. message 2 is second to be polled, the AggregStrategy retrieves the
"oldExchange" from the repo, combines the newExchange with it, and then the
predicate match.
3. message 3 is sent, since the predicate was matched.

If message1 was in "transaction1", and message2 in "transaction2", can I
assume that message3 is covered in "transaction2", or does a new
transaction is created ?

Moreover, for some component such as sql, we have some option like
"onConsume" that are statement triggered "when the row is "processed". But
is "being aggregated" "processed", or the following route needs to be done
(It seems with my test that it occurs at the end of the route).

What is confusing me is that if I have such a scenario where I synchronize
a SQL row correlated to a JMS message, in a JDBC repo, the "onConsume"
statement could be executed sooner (if the first to be store in the repo),
or at the end of the route if it was the second one to come.
I had the problem with the following pseudo-route :
1
.from(sql:select state=PENDING?onConsume=update state=PROCESSING")
.to(direct:aggreg)

2.
from(file:input)
.to(direct:aggreg)

3.
from(direct:aggreg)
.aggreg(jdbc)
.completionSize(2)
.to(sql:update state=COMPLETED)


IF the sql row was the first to come, the order of state was "PENDING ->
PROCESSING -> COMPLETED".
If it was the second, the order was "PENDING -> (?? COMPLETED ??) ->
PROCESSING. (I did not see COMPLETED but I assume that was appened).


*Do you have "patterns" to use aggregation strategy "symmetrically" ? For
example, to ensure that exchanges in route 1 & 2 are considered "processed"
and that a fresh new one comes from the repo ? Often, we can see
"if(oldExchange == null) exchange = newExchange", but is it really the best
way to implements aggregation strategy ? Shouldn't we create a whole new
exchange, with its own transaction & lifecycle ?*
Thank you in advance.

Jonathan Schoreels

Re: Aggregations, Transactions & "Exchange Processed"

Posted by Claus Ibsen <cl...@gmail.com>.
Ad 3)
The output of the aggregation is not "connected" to the input, so its
not able to run under same transaction etc.

On Wed, May 3, 2017 at 11:53 AM, Jonathan Schoreels
<jo...@gmail.com> wrote:
> Hi,
>
> I'm a bit confused with the mechanic behind aggregation, transaction & the
> "processed" state of an exchange.
>
> *tldr : go the the bold part at the bottom.*
>
> Let me introduce a small scenario :
>
> I have two consumers that poll two different jms queues. Those two
> consumers are using a JDBCAggregationRepository as an
> AggregationRepository. Let call them consumer1 & consumer2. When the
> JDBCAggregationRepository receive the two message, a third is created,
> resp. message1 (polled by consumer1), message2 (polled by consumer2),
> message3 (send by the aggregation strategy)
>
> Imagine the following events :
> 1. message1 is the first to be polled, the AggregStrategy makes a little
> bit of mapping and stores it in the repository.
> 2. message 2 is second to be polled, the AggregStrategy retrieves the
> "oldExchange" from the repo, combines the newExchange with it, and then the
> predicate match.
> 3. message 3 is sent, since the predicate was matched.
>
> If message1 was in "transaction1", and message2 in "transaction2", can I
> assume that message3 is covered in "transaction2", or does a new
> transaction is created ?
>
> Moreover, for some component such as sql, we have some option like
> "onConsume" that are statement triggered "when the row is "processed". But
> is "being aggregated" "processed", or the following route needs to be done
> (It seems with my test that it occurs at the end of the route).
>
> What is confusing me is that if I have such a scenario where I synchronize
> a SQL row correlated to a JMS message, in a JDBC repo, the "onConsume"
> statement could be executed sooner (if the first to be store in the repo),
> or at the end of the route if it was the second one to come.
> I had the problem with the following pseudo-route :
> 1
> .from(sql:select state=PENDING?onConsume=update state=PROCESSING")
> .to(direct:aggreg)
>
> 2.
> from(file:input)
> .to(direct:aggreg)
>
> 3.
> from(direct:aggreg)
> .aggreg(jdbc)
> .completionSize(2)
> .to(sql:update state=COMPLETED)
>
>
> IF the sql row was the first to come, the order of state was "PENDING ->
> PROCESSING -> COMPLETED".
> If it was the second, the order was "PENDING -> (?? COMPLETED ??) ->
> PROCESSING. (I did not see COMPLETED but I assume that was appened).
>
>
> *Do you have "patterns" to use aggregation strategy "symmetrically" ? For
> example, to ensure that exchanges in route 1 & 2 are considered "processed"
> and that a fresh new one comes from the repo ? Often, we can see
> "if(oldExchange == null) exchange = newExchange", but is it really the best
> way to implements aggregation strategy ? Shouldn't we create a whole new
> exchange, with its own transaction & lifecycle ?*
> Thank you in advance.
>
> Jonathan Schoreels



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2