You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Karolis Petrauskas <k....@gmail.com> on 2012/11/27 21:48:39 UTC

Aggregator and JMS acknowledgment

Hello,

I have a route with the following scheme (mep=InOnly):

    jms -> aggregator -> bean

I need to handle Oracle native types, therefore the JMS endpoint is
implemented by me.
The aggregation is used fro batching messages by count and interval
with constant correlation key.

In this flow, I need to ensure that all the messages are delivered,
therefore my bean endpoint needs to acknowledge messages after
processing them. The bean endpoint receives aggregated messages and
can only acknowledge entire batch (or reject it). The acknowledgement
of the messages can only be implemented by committing the JMS session.
This would acknowledge all the messages received till that time (not
only those, that were aggregated and delivered to the bean endpoint).

If it were no aggregation in the flow, I would implement the
acknowledgement by putting Synchorization object (implemening
org.apache.camel.AsyncCallback) to the message headers. The bean
endpoint would invoke done(false) on such synchronization object in
that way notifying the JMS endpoint to commit the session and proceed
with next message. The synchronization object could be implemented
using a mutex.

Introducing the aggregation complicates the acknowledgement scheme a
bit.One of the possible ways to implement the flow would be to
implement the synchronziation object with a bit richer interface. Such
interface could have operations done() for committing the session and
next() for fetching next message in the current transaction.The
aggregation strategy could invoke the next() method on the
synchronization object each time a message gets aggregated. The method
done() should be invoked by the bean endpoint. Here I came to the main
source of my confusion: what to do with the last message, that can be
fetched after invoking last next() in the batch and before done() in
the bean endpoint?

One of possible solutions I see is to use custom completionPredicate
and implement all the acknowledgement in it. Such predicate should
also reimplement all the logic currently provided by the aggregation
parameters: completionInterval and completionSize. Is this approach
"good enough", or there are better ways for handling such situation?
What could be the way of implementing this, if the JMS component would
be the default one (provided by the Camel)? In my case I cannot use
the XA transactions, but even if I could, it seems it would not helped
me a lot, because of the "first message after the batch" and the
restriction, that the commit acknowledges all messages fetched will
the time of the commit.

Any comments would be helpfull,

Thanks in advance,
Karolis