You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Robert Nicholson <ro...@gmail.com> on 2009/09/10 08:06:36 UTC

How can ActiveMQ help with our scenario?

So at work we currently have J2SE processes that use  
JMSMessageListeners to consume messages from a Websphere MQ 6.x queue.

At this time I'm enhancing our existing processes to support muliple  
JMSMessageListeners where was the original implementation used a  
single threaded listener only. ie. one session, one message listener  
but the processing of the message was performed all within onMessage  
hence the rate of processing the messages is considerably slow.

Right now I've enhanced this by dispatching a message after it arrives  
to a thread pool to be processed by a a worker thread. The framework  
used to utilize the threadpool gives us a high level of concurrency  
where it matters. As opposed to a solution that created N listeners  
where each listener was bound to a dedicated queue. The publisher  
would then route the messages to the appropriate queue ensuring the  
grouping of messages were correct as far as which queue contained  
which message.

One of the issues that come up with this scenario is how do you ensure  
that you've processed a message that you've removed from the queue so  
we used CLIENT_ACKNOWLEDGEMENT and because I can only acknowledge from  
the delivery thread what I do is have the worker thread queue up the  
acknowledgement and the very last thing the onMessage method does it  
work thru this "queue" and acknowledges messages so that they are  
removed from the queue.

The key goal being that you don't want to acknowledge a message from  
the queue that hasn't been processed yet. I suppose one of the flaws  
in this implementation is the concept that when you acknowledge a  
message you implicitly acknowledge all unacknowledged messages before  
it in the queue so this solution is by no means full proof. So in  
otherwords it's not only acknowleding the messages I already know I've  
processed but also some that I might not have processed because of  
these semantics.

The other approach is to use AUTO_ACKNOWLEDGE and then persist the  
state whilst it's been worked and then remove it or mark it deleted  
when it's finally been processed. If the consumer should need to be  
restarted it can accurately determine what's not been processed but  
what was acknowledged from the queue and resubmit those to the worker  
threads. This does seem on the surface to be more reliable than the  
"queueing up of acknowledgements" approach.

In any case, part of the reason either approach was conceived is the  
limitation that n Websphere's JMS MessageListeners attached to the  
same QueueSession are all invoked serially and so there's a  
performance hit you take there rather than a true concurrent approach  
that exists when you have farm of MDB's working for you. In our case  
MDB's aren't an option as we aren't using an application server.

How can we use ActiveMQ to increase throughput of processing from our  
queues such that we can utilize our existing concurrency frameworks  
and still ensure that no messages are lost ie. acknowledge without  
having been processed?

If I was to bridge from Websphere Queue to ActiveMQ and write my app  
to consume from ActiveMQ how does ActiveMQ allow me to radpidly and  
reliably consume messages using JMS apis such that I'm not forced to  
do all the work in the delivery thread but I still can accurate keep  
track of what's been worked on and what's been processed such that I  
can recover the consumer from an outage without lose of messages?

Re: How can ActiveMQ help with our scenario?

Posted by Rob Davies <ra...@gmail.com>.
Long message - I think what you need is  
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE as your acknowledgement mode -  
its ActiveMQ specific - and only in 5.3 - but please try a snapshot.  
The GA of 5.3 should happen in the near future.

On 10 Sep 2009, at 07:06, Robert Nicholson wrote:

> So at work we currently have J2SE processes that use  
> JMSMessageListeners to consume messages from a Websphere MQ 6.x queue.
>
> At this time I'm enhancing our existing processes to support muliple  
> JMSMessageListeners where was the original implementation used a  
> single threaded listener only. ie. one session, one message listener  
> but the processing of the message was performed all within onMessage  
> hence the rate of processing the messages is considerably slow.
>
> Right now I've enhanced this by dispatching a message after it  
> arrives to a thread pool to be processed by a a worker thread. The  
> framework used to utilize the threadpool gives us a high level of  
> concurrency where it matters. As opposed to a solution that created  
> N listeners where each listener was bound to a dedicated queue. The  
> publisher would then route the messages to the appropriate queue  
> ensuring the grouping of messages were correct as far as which queue  
> contained which message.
>
> One of the issues that come up with this scenario is how do you  
> ensure that you've processed a message that you've removed from the  
> queue so we used CLIENT_ACKNOWLEDGEMENT and because I can only  
> acknowledge from the delivery thread what I do is have the worker  
> thread queue up the acknowledgement and the very last thing the  
> onMessage method does it work thru this "queue" and acknowledges  
> messages so that they are removed from the queue.
>
> The key goal being that you don't want to acknowledge a message from  
> the queue that hasn't been processed yet. I suppose one of the flaws  
> in this implementation is the concept that when you acknowledge a  
> message you implicitly acknowledge all unacknowledged messages  
> before it in the queue so this solution is by no means full proof.  
> So in otherwords it's not only acknowleding the messages I already  
> know I've processed but also some that I might not have processed  
> because of these semantics.
>
> The other approach is to use AUTO_ACKNOWLEDGE and then persist the  
> state whilst it's been worked and then remove it or mark it deleted  
> when it's finally been processed. If the consumer should need to be  
> restarted it can accurately determine what's not been processed but  
> what was acknowledged from the queue and resubmit those to the  
> worker threads. This does seem on the surface to be more reliable  
> than the "queueing up of acknowledgements" approach.
>
> In any case, part of the reason either approach was conceived is the  
> limitation that n Websphere's JMS MessageListeners attached to the  
> same QueueSession are all invoked serially and so there's a  
> performance hit you take there rather than a true concurrent  
> approach that exists when you have farm of MDB's working for you. In  
> our case MDB's aren't an option as we aren't using an application  
> server.
>
> How can we use ActiveMQ to increase throughput of processing from  
> our queues such that we can utilize our existing concurrency  
> frameworks and still ensure that no messages are lost ie.  
> acknowledge without having been processed?
>
> If I was to bridge from Websphere Queue to ActiveMQ and write my app  
> to consume from ActiveMQ how does ActiveMQ allow me to radpidly and  
> reliably consume messages using JMS apis such that I'm not forced to  
> do all the work in the delivery thread but I still can accurate keep  
> track of what's been worked on and what's been processed such that I  
> can recover the consumer from an outage without lose of messages?

Rob Davies
I work here: http://fusesource.com
My Blog: http://rajdavies.blogspot.com/
I'm writing this: http://www.manning.com/snyder/