You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Rajith Muditha Attapattu <ra...@gmail.com> on 2018/08/21 16:28:28 UTC

Enabling transactions for sjms-batch [Was: Transactions (Multiple JMS Consumers) and Aggregation]

Quick clarification..

Do we still need to use the transacted() directive in the DSL or is it
enough to set the transacted=true in the endpoint URI ?

Further, does transaction batch size work independently of the
completionSize ?
If I set transaction batch size to x and completionSize to y.

if x = y, then it commits when the aggregation is complete.
if x > y, then it commits after the aggregation is complete.
if x < y then it commits before the aggregation is complete.

Assume from(sjms:myQueue?..).to(jms:myQueue).

For the last two cases how does the messages sent to jms:myQueue can be
done in a transactional way? For the last case if the transaction commits
before the aggregation is complete and sending to the jms queue fails,
wouldn't we loose messages ?

Please clarify?

On Wed, Aug 1, 2018 at 8:41 AM, Rajith Muditha Attapattu <rajith77@gmail.com
> wrote:

> Thank you Claus!
>
> The use case is as follows.
> Queue ----> some validation ---> simple transformation --->
> Batch/Aggregate ---> transform to SAP payload ----> SAP
>
> With sjms, we could do
> Queue --> sjms (with batching) ---> iterate over list to do validation -->
> transform to SAP payload ---> SAP.
>                                                                      |
>                                                                      ---->
> failed messages sent to error queue.
>
> Since sjms supports transactions, I'm assuming that any
> unexpected/unrecoverable errors at anypoint in the route will rollback the
> transaction.
>
>
> On Wed, Aug 1, 2018 at 8:15 AM, Claus Ibsen <cl...@gmail.com> wrote:
>
>> Hi
>>
>> Yes exactly, and yeah camel-sjms has a special support for this kind
>> of use-case you have, which is built directly into its consumer, so it
>> would be just
>>
>> from A (incl aggregate) to B
>>
>>
>> On Wed, Aug 1, 2018 at 2:10 PM, Rajith Muditha Attapattu
>> <ra...@gmail.com> wrote:
>> > Thank you Claus for your quick response.
>> >
>> > Let's say my sending endpoint is another jms queue and considering my
>> > original route....
>> > Is the transaction boundaries as follows?
>> >
>> > from(myQueueA) .transacted() ....... Aggregator ......... to(myQueueB)
>> > <------------------tx------------------------->
>> > <------------------tx------->
>> > I'm assuming this is why a persistent aggregator is recommend to avoid
>> > loosing messages.
>> >
>> > The sjms seems like a better fit. I will look into it.
>> > Thank you!
>> >
>> > On Wed, Aug 1, 2018 at 3:32 AM, Claus Ibsen <cl...@gmail.com>
>> wrote:
>> >
>> >> Hi
>> >>
>> >> The aggregate EIP works independently from the original
>> >> route/exchange. So in your example the transacted route will commit
>> >> after the message has been handed off to the aggregator and then the
>> >> route continues, but the route ends there, so it will then commit. And
>> >> on the same time the aggregator runs in parallel (independent) and
>> >> aggregate messages and then when they are completed, then trigger the
>> >> message to be routed independent, but that is routed outside a
>> >> transaction.
>> >>
>> >> So if you aim to batch N JMS messages into a single aggregate in the
>> >> same TX then what you do there does not work.
>> >>
>> >> For batch JMS in transaction then look at the camel-sjms component
>> >> that has such a feature. (the regular camel-jms does not).
>> >> https://github.com/apache/camel/blob/master/components/
>> >> camel-sjms/src/main/docs/sjms-batch-component.adoc
>> >>
>> >> And mind there is camel-sjms2 component also for JMS 2.0 API.
>> >>
>> >> For examples of this, then I suggest to look at the unit tests of
>> itself.
>> >>
>> >> On Wed, Aug 1, 2018 at 2:58 AM, Rajith Muditha Attapattu
>> >> <ra...@gmail.com> wrote:
>> >> > Hi All,
>> >> >
>> >> > I'm trying to understand how transactions would work in the following
>> >> > scenario.
>> >> > Given transactions are scoped by sessions, and each concurrent
>> consumer
>> >> > will be run in it's own session (possibly different connections as
>> we see
>> >> > on wmq connection manager), how does transactions work?
>> >> >
>> >> > When does the transaction manager commit? (my understanding is when
>> the
>> >> > aggregation is completed and the aggregated message gets sent to the
>> next
>> >> > endpoint).
>> >> >
>> >> > When it commits, does the transaction manager go through each
>> session and
>> >> > commit?
>> >> >
>> >> > What happens to messages that are still being aggregated?
>> >> > (i.e  once the aggregation completes the aggregated message gets
>> through
>> >> ..
>> >> > but I'm assuming subsequent messages are in-flight and are being
>> >> > aggregated, while the previous aggregate is being sent and commit
>> being
>> >> > called).
>> >> >
>> >> > Are there any corner cases here? I feel like we may call commit on
>> >> inflight
>> >> > messages and there's a chance of loosing messages if it the
>> application
>> >> > crashes.
>> >> >
>> >> > from("wmq:myQueue?concurrentConsumers=10")
>> >> >   .transacted()
>> >> >  // some transformation
>> >> >   .aggregate(new MyAggregationStrategy()).constant(true)
>> >> >                 .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
>> >> >                 .completionInterval(Integer.p
>> arseInt(AGG_BATCH_TIMEOUT)
>> >> >  .to("some-other-endpoint")
>> >> >
>> >> > As an aside, we already see the performance is really bad compared
>> to not
>> >> > using transactions. Wondering if client-ack could be used in this
>> >> > situation. Hypothetically we can keep track of the last message (for
>> each
>> >> > session) and call acknowledge on each message.
>> >> > But not sure how to identify that.
>> >> >
>> >> > Regards,
>> >> >
>> >> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>> >>
>> >>
>> >>
>> >> --
>> >> Claus Ibsen
>> >> -----------------
>> >> http://davsclaus.com @davsclaus
>> >> Camel in Action 2: https://www.manning.com/ibsen2
>> >>
>> >
>> >
>> >
>> > --
>> > Regards,
>> >
>> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://davsclaus.com @davsclaus
>> Camel in Action 2: https://www.manning.com/ibsen2
>>
>
>
>
> --
> Regards,
>
> Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>



-- 
Regards,

Rajith Muditha Attapattu <http://rajith.2rlabs.com/>

Re: Enabling transactions for sjms-batch [Was: Transactions (Multiple JMS Consumers) and Aggregation]

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Aug 21, 2018 at 6:28 PM, Rajith Muditha Attapattu
<ra...@gmail.com> wrote:
> Quick clarification..
>
> Do we still need to use the transacted() directive in the DSL or is it
> enough to set the transacted=true in the endpoint URI ?
>

That depends, if all you need is JMS transacted ack mode, and there
are no other resources that needs to participate in the TX and be able
to rollback / commit as well.
If so then transacted=true is enough. Otherwise you need transacted in
DSL and setup the TX manager and all of that.



> Further, does transaction batch size work independently of the
> completionSize ?

Which transaction batch size do you mean? Are you talking about some
kind of size on a TX manager or what?

> If I set transaction batch size to x and completionSize to y.
>
> if x = y, then it commits when the aggregation is complete.
> if x > y, then it commits after the aggregation is complete.
> if x < y then it commits before the aggregation is complete.
>
> Assume from(sjms:myQueue?..).to(jms:myQueue).
>
> For the last two cases how does the messages sent to jms:myQueue can be
> done in a transactional way? For the last case if the transaction commits
> before the aggregation is complete and sending to the jms queue fails,
> wouldn't we loose messages ?
>
> Please clarify?
>
> On Wed, Aug 1, 2018 at 8:41 AM, Rajith Muditha Attapattu <rajith77@gmail.com
>> wrote:
>
>> Thank you Claus!
>>
>> The use case is as follows.
>> Queue ----> some validation ---> simple transformation --->
>> Batch/Aggregate ---> transform to SAP payload ----> SAP
>>
>> With sjms, we could do
>> Queue --> sjms (with batching) ---> iterate over list to do validation -->
>> transform to SAP payload ---> SAP.
>>                                                                      |
>>                                                                      ---->
>> failed messages sent to error queue.
>>
>> Since sjms supports transactions, I'm assuming that any
>> unexpected/unrecoverable errors at anypoint in the route will rollback the
>> transaction.
>>
>>
>> On Wed, Aug 1, 2018 at 8:15 AM, Claus Ibsen <cl...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> Yes exactly, and yeah camel-sjms has a special support for this kind
>>> of use-case you have, which is built directly into its consumer, so it
>>> would be just
>>>
>>> from A (incl aggregate) to B
>>>
>>>
>>> On Wed, Aug 1, 2018 at 2:10 PM, Rajith Muditha Attapattu
>>> <ra...@gmail.com> wrote:
>>> > Thank you Claus for your quick response.
>>> >
>>> > Let's say my sending endpoint is another jms queue and considering my
>>> > original route....
>>> > Is the transaction boundaries as follows?
>>> >
>>> > from(myQueueA) .transacted() ....... Aggregator ......... to(myQueueB)
>>> > <------------------tx------------------------->
>>> > <------------------tx------->
>>> > I'm assuming this is why a persistent aggregator is recommend to avoid
>>> > loosing messages.
>>> >
>>> > The sjms seems like a better fit. I will look into it.
>>> > Thank you!
>>> >
>>> > On Wed, Aug 1, 2018 at 3:32 AM, Claus Ibsen <cl...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi
>>> >>
>>> >> The aggregate EIP works independently from the original
>>> >> route/exchange. So in your example the transacted route will commit
>>> >> after the message has been handed off to the aggregator and then the
>>> >> route continues, but the route ends there, so it will then commit. And
>>> >> on the same time the aggregator runs in parallel (independent) and
>>> >> aggregate messages and then when they are completed, then trigger the
>>> >> message to be routed independent, but that is routed outside a
>>> >> transaction.
>>> >>
>>> >> So if you aim to batch N JMS messages into a single aggregate in the
>>> >> same TX then what you do there does not work.
>>> >>
>>> >> For batch JMS in transaction then look at the camel-sjms component
>>> >> that has such a feature. (the regular camel-jms does not).
>>> >> https://github.com/apache/camel/blob/master/components/
>>> >> camel-sjms/src/main/docs/sjms-batch-component.adoc
>>> >>
>>> >> And mind there is camel-sjms2 component also for JMS 2.0 API.
>>> >>
>>> >> For examples of this, then I suggest to look at the unit tests of
>>> itself.
>>> >>
>>> >> On Wed, Aug 1, 2018 at 2:58 AM, Rajith Muditha Attapattu
>>> >> <ra...@gmail.com> wrote:
>>> >> > Hi All,
>>> >> >
>>> >> > I'm trying to understand how transactions would work in the following
>>> >> > scenario.
>>> >> > Given transactions are scoped by sessions, and each concurrent
>>> consumer
>>> >> > will be run in it's own session (possibly different connections as
>>> we see
>>> >> > on wmq connection manager), how does transactions work?
>>> >> >
>>> >> > When does the transaction manager commit? (my understanding is when
>>> the
>>> >> > aggregation is completed and the aggregated message gets sent to the
>>> next
>>> >> > endpoint).
>>> >> >
>>> >> > When it commits, does the transaction manager go through each
>>> session and
>>> >> > commit?
>>> >> >
>>> >> > What happens to messages that are still being aggregated?
>>> >> > (i.e  once the aggregation completes the aggregated message gets
>>> through
>>> >> ..
>>> >> > but I'm assuming subsequent messages are in-flight and are being
>>> >> > aggregated, while the previous aggregate is being sent and commit
>>> being
>>> >> > called).
>>> >> >
>>> >> > Are there any corner cases here? I feel like we may call commit on
>>> >> inflight
>>> >> > messages and there's a chance of loosing messages if it the
>>> application
>>> >> > crashes.
>>> >> >
>>> >> > from("wmq:myQueue?concurrentConsumers=10")
>>> >> >   .transacted()
>>> >> >  // some transformation
>>> >> >   .aggregate(new MyAggregationStrategy()).constant(true)
>>> >> >                 .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
>>> >> >                 .completionInterval(Integer.p
>>> arseInt(AGG_BATCH_TIMEOUT)
>>> >> >  .to("some-other-endpoint")
>>> >> >
>>> >> > As an aside, we already see the performance is really bad compared
>>> to not
>>> >> > using transactions. Wondering if client-ack could be used in this
>>> >> > situation. Hypothetically we can keep track of the last message (for
>>> each
>>> >> > session) and call acknowledge on each message.
>>> >> > But not sure how to identify that.
>>> >> >
>>> >> > Regards,
>>> >> >
>>> >> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Claus Ibsen
>>> >> -----------------
>>> >> http://davsclaus.com @davsclaus
>>> >> Camel in Action 2: https://www.manning.com/ibsen2
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Regards,
>>> >
>>> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> -----------------
>>> http://davsclaus.com @davsclaus
>>> Camel in Action 2: https://www.manning.com/ibsen2
>>>
>>
>>
>>
>> --
>> Regards,
>>
>> Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>>
>
>
>
> --
> Regards,
>
> Rajith Muditha Attapattu <http://rajith.2rlabs.com/>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2