You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Rajith Muditha Attapattu <ra...@gmail.com> on 2018/08/01 00:58:43 UTC

Transactions (Multiple JMS Consumers) and Aggregation

Hi All,

I'm trying to understand how transactions would work in the following
scenario.
Given transactions are scoped by sessions, and each concurrent consumer
will be run in it's own session (possibly different connections as we see
on wmq connection manager), how does transactions work?

When does the transaction manager commit? (my understanding is when the
aggregation is completed and the aggregated message gets sent to the next
endpoint).

When it commits, does the transaction manager go through each session and
commit?

What happens to messages that are still being aggregated?
(i.e  once the aggregation completes the aggregated message gets through ..
but I'm assuming subsequent messages are in-flight and are being
aggregated, while the previous aggregate is being sent and commit being
called).

Are there any corner cases here? I feel like we may call commit on inflight
messages and there's a chance of loosing messages if it the application
crashes.

from("wmq:myQueue?concurrentConsumers=10")
  .transacted()
 // some transformation
  .aggregate(new MyAggregationStrategy()).constant(true)
                .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
                .completionInterval(Integer.parseInt(AGG_BATCH_TIMEOUT)
 .to("some-other-endpoint")

As an aside, we already see the performance is really bad compared to not
using transactions. Wondering if client-ack could be used in this
situation. Hypothetically we can keep track of the last message (for each
session) and call acknowledge on each message.
But not sure how to identify that.

Regards,

Rajith Muditha Attapattu <http://rajith.2rlabs.com/>

Re: Transactions (Multiple JMS Consumers) and Aggregation

Posted by Rajith Muditha Attapattu <ra...@gmail.com>.
Thank you Claus!

The use case is as follows.
Queue ----> some validation ---> simple transformation ---> Batch/Aggregate
---> transform to SAP payload ----> SAP

With sjms, we could do
Queue --> sjms (with batching) ---> iterate over list to do validation -->
transform to SAP payload ---> SAP.
                                                                     |
                                                                     ---->
failed messages sent to error queue.

Since sjms supports transactions, I'm assuming that any
unexpected/unrecoverable errors at anypoint in the route will rollback the
transaction.


On Wed, Aug 1, 2018 at 8:15 AM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> Yes exactly, and yeah camel-sjms has a special support for this kind
> of use-case you have, which is built directly into its consumer, so it
> would be just
>
> from A (incl aggregate) to B
>
>
> On Wed, Aug 1, 2018 at 2:10 PM, Rajith Muditha Attapattu
> <ra...@gmail.com> wrote:
> > Thank you Claus for your quick response.
> >
> > Let's say my sending endpoint is another jms queue and considering my
> > original route....
> > Is the transaction boundaries as follows?
> >
> > from(myQueueA) .transacted() ....... Aggregator ......... to(myQueueB)
> > <------------------tx------------------------->
> > <------------------tx------->
> > I'm assuming this is why a persistent aggregator is recommend to avoid
> > loosing messages.
> >
> > The sjms seems like a better fit. I will look into it.
> > Thank you!
> >
> > On Wed, Aug 1, 2018 at 3:32 AM, Claus Ibsen <cl...@gmail.com>
> wrote:
> >
> >> Hi
> >>
> >> The aggregate EIP works independently from the original
> >> route/exchange. So in your example the transacted route will commit
> >> after the message has been handed off to the aggregator and then the
> >> route continues, but the route ends there, so it will then commit. And
> >> on the same time the aggregator runs in parallel (independent) and
> >> aggregate messages and then when they are completed, then trigger the
> >> message to be routed independent, but that is routed outside a
> >> transaction.
> >>
> >> So if you aim to batch N JMS messages into a single aggregate in the
> >> same TX then what you do there does not work.
> >>
> >> For batch JMS in transaction then look at the camel-sjms component
> >> that has such a feature. (the regular camel-jms does not).
> >> https://github.com/apache/camel/blob/master/components/
> >> camel-sjms/src/main/docs/sjms-batch-component.adoc
> >>
> >> And mind there is camel-sjms2 component also for JMS 2.0 API.
> >>
> >> For examples of this, then I suggest to look at the unit tests of
> itself.
> >>
> >> On Wed, Aug 1, 2018 at 2:58 AM, Rajith Muditha Attapattu
> >> <ra...@gmail.com> wrote:
> >> > Hi All,
> >> >
> >> > I'm trying to understand how transactions would work in the following
> >> > scenario.
> >> > Given transactions are scoped by sessions, and each concurrent
> consumer
> >> > will be run in it's own session (possibly different connections as we
> see
> >> > on wmq connection manager), how does transactions work?
> >> >
> >> > When does the transaction manager commit? (my understanding is when
> the
> >> > aggregation is completed and the aggregated message gets sent to the
> next
> >> > endpoint).
> >> >
> >> > When it commits, does the transaction manager go through each session
> and
> >> > commit?
> >> >
> >> > What happens to messages that are still being aggregated?
> >> > (i.e  once the aggregation completes the aggregated message gets
> through
> >> ..
> >> > but I'm assuming subsequent messages are in-flight and are being
> >> > aggregated, while the previous aggregate is being sent and commit
> being
> >> > called).
> >> >
> >> > Are there any corner cases here? I feel like we may call commit on
> >> inflight
> >> > messages and there's a chance of loosing messages if it the
> application
> >> > crashes.
> >> >
> >> > from("wmq:myQueue?concurrentConsumers=10")
> >> >   .transacted()
> >> >  // some transformation
> >> >   .aggregate(new MyAggregationStrategy()).constant(true)
> >> >                 .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
> >> >                 .completionInterval(Integer.
> parseInt(AGG_BATCH_TIMEOUT)
> >> >  .to("some-other-endpoint")
> >> >
> >> > As an aside, we already see the performance is really bad compared to
> not
> >> > using transactions. Wondering if client-ack could be used in this
> >> > situation. Hypothetically we can keep track of the last message (for
> each
> >> > session) and call acknowledge on each message.
> >> > But not sure how to identify that.
> >> >
> >> > Regards,
> >> >
> >> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> http://davsclaus.com @davsclaus
> >> Camel in Action 2: https://www.manning.com/ibsen2
> >>
> >
> >
> >
> > --
> > Regards,
> >
> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>



-- 
Regards,

Rajith Muditha Attapattu <http://rajith.2rlabs.com/>

Re: Transactions (Multiple JMS Consumers) and Aggregation

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Yes exactly, and yeah camel-sjms has a special support for this kind
of use-case you have, which is built directly into its consumer, so it
would be just

from A (incl aggregate) to B


On Wed, Aug 1, 2018 at 2:10 PM, Rajith Muditha Attapattu
<ra...@gmail.com> wrote:
> Thank you Claus for your quick response.
>
> Let's say my sending endpoint is another jms queue and considering my
> original route....
> Is the transaction boundaries as follows?
>
> from(myQueueA) .transacted() ....... Aggregator ......... to(myQueueB)
> <------------------tx------------------------->
> <------------------tx------->
> I'm assuming this is why a persistent aggregator is recommend to avoid
> loosing messages.
>
> The sjms seems like a better fit. I will look into it.
> Thank you!
>
> On Wed, Aug 1, 2018 at 3:32 AM, Claus Ibsen <cl...@gmail.com> wrote:
>
>> Hi
>>
>> The aggregate EIP works independently from the original
>> route/exchange. So in your example the transacted route will commit
>> after the message has been handed off to the aggregator and then the
>> route continues, but the route ends there, so it will then commit. And
>> on the same time the aggregator runs in parallel (independent) and
>> aggregate messages and then when they are completed, then trigger the
>> message to be routed independent, but that is routed outside a
>> transaction.
>>
>> So if you aim to batch N JMS messages into a single aggregate in the
>> same TX then what you do there does not work.
>>
>> For batch JMS in transaction then look at the camel-sjms component
>> that has such a feature. (the regular camel-jms does not).
>> https://github.com/apache/camel/blob/master/components/
>> camel-sjms/src/main/docs/sjms-batch-component.adoc
>>
>> And mind there is camel-sjms2 component also for JMS 2.0 API.
>>
>> For examples of this, then I suggest to look at the unit tests of itself.
>>
>> On Wed, Aug 1, 2018 at 2:58 AM, Rajith Muditha Attapattu
>> <ra...@gmail.com> wrote:
>> > Hi All,
>> >
>> > I'm trying to understand how transactions would work in the following
>> > scenario.
>> > Given transactions are scoped by sessions, and each concurrent consumer
>> > will be run in it's own session (possibly different connections as we see
>> > on wmq connection manager), how does transactions work?
>> >
>> > When does the transaction manager commit? (my understanding is when the
>> > aggregation is completed and the aggregated message gets sent to the next
>> > endpoint).
>> >
>> > When it commits, does the transaction manager go through each session and
>> > commit?
>> >
>> > What happens to messages that are still being aggregated?
>> > (i.e  once the aggregation completes the aggregated message gets through
>> ..
>> > but I'm assuming subsequent messages are in-flight and are being
>> > aggregated, while the previous aggregate is being sent and commit being
>> > called).
>> >
>> > Are there any corner cases here? I feel like we may call commit on
>> inflight
>> > messages and there's a chance of loosing messages if it the application
>> > crashes.
>> >
>> > from("wmq:myQueue?concurrentConsumers=10")
>> >   .transacted()
>> >  // some transformation
>> >   .aggregate(new MyAggregationStrategy()).constant(true)
>> >                 .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
>> >                 .completionInterval(Integer.parseInt(AGG_BATCH_TIMEOUT)
>> >  .to("some-other-endpoint")
>> >
>> > As an aside, we already see the performance is really bad compared to not
>> > using transactions. Wondering if client-ack could be used in this
>> > situation. Hypothetically we can keep track of the last message (for each
>> > session) and call acknowledge on each message.
>> > But not sure how to identify that.
>> >
>> > Regards,
>> >
>> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://davsclaus.com @davsclaus
>> Camel in Action 2: https://www.manning.com/ibsen2
>>
>
>
>
> --
> Regards,
>
> Rajith Muditha Attapattu <http://rajith.2rlabs.com/>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Re: Transactions (Multiple JMS Consumers) and Aggregation

Posted by Rajith Muditha Attapattu <ra...@gmail.com>.
Thank you Claus for your quick response.

Let's say my sending endpoint is another jms queue and considering my
original route....
Is the transaction boundaries as follows?

from(myQueueA) .transacted() ....... Aggregator ......... to(myQueueB)
<------------------tx------------------------->
<------------------tx------->
I'm assuming this is why a persistent aggregator is recommend to avoid
loosing messages.

The sjms seems like a better fit. I will look into it.
Thank you!

On Wed, Aug 1, 2018 at 3:32 AM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> The aggregate EIP works independently from the original
> route/exchange. So in your example the transacted route will commit
> after the message has been handed off to the aggregator and then the
> route continues, but the route ends there, so it will then commit. And
> on the same time the aggregator runs in parallel (independent) and
> aggregate messages and then when they are completed, then trigger the
> message to be routed independent, but that is routed outside a
> transaction.
>
> So if you aim to batch N JMS messages into a single aggregate in the
> same TX then what you do there does not work.
>
> For batch JMS in transaction then look at the camel-sjms component
> that has such a feature. (the regular camel-jms does not).
> https://github.com/apache/camel/blob/master/components/
> camel-sjms/src/main/docs/sjms-batch-component.adoc
>
> And mind there is camel-sjms2 component also for JMS 2.0 API.
>
> For examples of this, then I suggest to look at the unit tests of itself.
>
> On Wed, Aug 1, 2018 at 2:58 AM, Rajith Muditha Attapattu
> <ra...@gmail.com> wrote:
> > Hi All,
> >
> > I'm trying to understand how transactions would work in the following
> > scenario.
> > Given transactions are scoped by sessions, and each concurrent consumer
> > will be run in it's own session (possibly different connections as we see
> > on wmq connection manager), how does transactions work?
> >
> > When does the transaction manager commit? (my understanding is when the
> > aggregation is completed and the aggregated message gets sent to the next
> > endpoint).
> >
> > When it commits, does the transaction manager go through each session and
> > commit?
> >
> > What happens to messages that are still being aggregated?
> > (i.e  once the aggregation completes the aggregated message gets through
> ..
> > but I'm assuming subsequent messages are in-flight and are being
> > aggregated, while the previous aggregate is being sent and commit being
> > called).
> >
> > Are there any corner cases here? I feel like we may call commit on
> inflight
> > messages and there's a chance of loosing messages if it the application
> > crashes.
> >
> > from("wmq:myQueue?concurrentConsumers=10")
> >   .transacted()
> >  // some transformation
> >   .aggregate(new MyAggregationStrategy()).constant(true)
> >                 .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
> >                 .completionInterval(Integer.parseInt(AGG_BATCH_TIMEOUT)
> >  .to("some-other-endpoint")
> >
> > As an aside, we already see the performance is really bad compared to not
> > using transactions. Wondering if client-ack could be used in this
> > situation. Hypothetically we can keep track of the last message (for each
> > session) and call acknowledge on each message.
> > But not sure how to identify that.
> >
> > Regards,
> >
> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>



-- 
Regards,

Rajith Muditha Attapattu <http://rajith.2rlabs.com/>

Re: Transactions (Multiple JMS Consumers) and Aggregation

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

The aggregate EIP works independently from the original
route/exchange. So in your example the transacted route will commit
after the message has been handed off to the aggregator and then the
route continues, but the route ends there, so it will then commit. And
on the same time the aggregator runs in parallel (independent) and
aggregate messages and then when they are completed, then trigger the
message to be routed independent, but that is routed outside a
transaction.

So if you aim to batch N JMS messages into a single aggregate in the
same TX then what you do there does not work.

For batch JMS in transaction then look at the camel-sjms component
that has such a feature. (the regular camel-jms does not).
https://github.com/apache/camel/blob/master/components/camel-sjms/src/main/docs/sjms-batch-component.adoc

And mind there is camel-sjms2 component also for JMS 2.0 API.

For examples of this, then I suggest to look at the unit tests of itself.

On Wed, Aug 1, 2018 at 2:58 AM, Rajith Muditha Attapattu
<ra...@gmail.com> wrote:
> Hi All,
>
> I'm trying to understand how transactions would work in the following
> scenario.
> Given transactions are scoped by sessions, and each concurrent consumer
> will be run in it's own session (possibly different connections as we see
> on wmq connection manager), how does transactions work?
>
> When does the transaction manager commit? (my understanding is when the
> aggregation is completed and the aggregated message gets sent to the next
> endpoint).
>
> When it commits, does the transaction manager go through each session and
> commit?
>
> What happens to messages that are still being aggregated?
> (i.e  once the aggregation completes the aggregated message gets through ..
> but I'm assuming subsequent messages are in-flight and are being
> aggregated, while the previous aggregate is being sent and commit being
> called).
>
> Are there any corner cases here? I feel like we may call commit on inflight
> messages and there's a chance of loosing messages if it the application
> crashes.
>
> from("wmq:myQueue?concurrentConsumers=10")
>   .transacted()
>  // some transformation
>   .aggregate(new MyAggregationStrategy()).constant(true)
>                 .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
>                 .completionInterval(Integer.parseInt(AGG_BATCH_TIMEOUT)
>  .to("some-other-endpoint")
>
> As an aside, we already see the performance is really bad compared to not
> using transactions. Wondering if client-ack could be used in this
> situation. Hypothetically we can keep track of the last message (for each
> session) and call acknowledge on each message.
> But not sure how to identify that.
>
> Regards,
>
> Rajith Muditha Attapattu <http://rajith.2rlabs.com/>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2