You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by retok <re...@profidatagroup.com> on 2013/08/06 11:07:44 UTC

Stop and resume message consuming

We have a use case where there is a single queue that has multiple message
consumers. It is important that the messages are consumed in
first-in-first-out order and message consuming must stop when a message
cannot be acknowledged (e.g. due to an application/database error) and must
not be resumed until the problem has been solved. Consuming must be resumed
with the not acknowledged message.

To guarantee ordering when multiple consumers are around, we assume that we
need to use the exclusive consumer feature of ActiveMQ. To have control over
message acknowledgement, we assume that we need to use
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE.

But now, how can we achieve that message consuming is stopping whenever we
do not acknowledge a message? 
And how can we resume consuming messages after the application/database
error has been resolved (not knowing which consumer was the exclusive
consumer)?

None of the consumer needs to die upon an error of the application/database
behind, so failover is not the topic of this question.

Thanks for any help,
Reto



--
View this message in context: http://activemq.2283324.n4.nabble.com/Stop-and-resume-message-consuming-tp4670118.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Stop and resume message consuming

Posted by Christian Posta <ch...@gmail.com>.
Take a look here:

https://gist.github.com/christian-posta/6211175

Key is to use prefetch == 1 and   factory.setAlwaysSessionAsync(true) on
the consumer.

Cheers,
Christian






On Mon, Aug 12, 2013 at 5:20 AM, retok <re...@profidatagroup.com>wrote:

> JmsTest.zip <
> http://activemq.2283324.n4.nabble.com/file/n4670280/JmsTest.zip>
>
> I attached an example with a simple broker service, a producer and a
> consumer.
> The producer simply sends 30 messages and the consumer does not acknowledge
> every 3rd message.
>
> The effect is, that the first time, message 1 and 2 get acked as expected.
> Message 3 does not get acked as expected. But message 4 also gets through
> and gets acked --> This should not happen.
> I observed, that it only happens the first time after starting the test
> apps, so after message 6, 9, 12, etc. it does not happen anymore.
>
> I also observed, that I can prevent this from happening the first time,
> when
> I stop() the consumer when a message is not acked and start() the consumer
> after recovery (lines 72 and 101 in my example TestJmsConsumer.java). Might
> this be the solution?
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Stop-and-resume-message-consuming-tp4670118p4670280.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Re: Stop and resume message consuming

Posted by retok <re...@profidatagroup.com>.
JmsTest.zip <http://activemq.2283324.n4.nabble.com/file/n4670280/JmsTest.zip>  

I attached an example with a simple broker service, a producer and a
consumer.
The producer simply sends 30 messages and the consumer does not acknowledge
every 3rd message.

The effect is, that the first time, message 1 and 2 get acked as expected.
Message 3 does not get acked as expected. But message 4 also gets through
and gets acked --> This should not happen.
I observed, that it only happens the first time after starting the test
apps, so after message 6, 9, 12, etc. it does not happen anymore.

I also observed, that I can prevent this from happening the first time, when
I stop() the consumer when a message is not acked and start() the consumer
after recovery (lines 72 and 101 in my example TestJmsConsumer.java). Might
this be the solution?



--
View this message in context: http://activemq.2283324.n4.nabble.com/Stop-and-resume-message-consuming-tp4670118p4670280.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Stop and resume message consuming

Posted by Christian Posta <ch...@gmail.com>.
Do you have a test case? I can look at your test case and see exactly
what's going on.


On Wed, Aug 7, 2013 at 2:27 AM, retok <re...@profidatagroup.com> wrote:

> Thanks for your reply. I have tried to set prefetch to 0. But this does not
> work for async sessions. I have to set it to a minimum of 1 because I would
> like to use the MessageListener concept and do not want to construct a sync
> session where I have to call receive() myself.
>
> Now, the effect of this is strange. If I have a couple of messages, say 1
> to
> 10, where message 3 is not acked, then I still get message 4 before my
> consumer does not proceed. Then, after recover(), I get messages 3
> (redelivery), 5 and 6. If I do not ack message 6, I still get message 7,
> bevore the consumer stops. And so on.
>
> So the message order in my app is not guaranteed: 1, 2, 4, recover() 3, 5,
> 7, recover() 6, 8, ...
> Any further hints?
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Stop-and-resume-message-consuming-tp4670118p4670141.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Re: Stop and resume message consuming

Posted by retok <re...@profidatagroup.com>.
Thanks for your reply. I have tried to set prefetch to 0. But this does not
work for async sessions. I have to set it to a minimum of 1 because I would
like to use the MessageListener concept and do not want to construct a sync
session where I have to call receive() myself.

Now, the effect of this is strange. If I have a couple of messages, say 1 to
10, where message 3 is not acked, then I still get message 4 before my
consumer does not proceed. Then, after recover(), I get messages 3
(redelivery), 5 and 6. If I do not ack message 6, I still get message 7,
bevore the consumer stops. And so on.

So the message order in my app is not guaranteed: 1, 2, 4, recover() 3, 5,
7, recover() 6, 8, ...
Any further hints?



--
View this message in context: http://activemq.2283324.n4.nabble.com/Stop-and-resume-message-consuming-tp4670118p4670141.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Stop and resume message consuming

Posted by Christian Posta <ch...@gmail.com>.
I think you're on the right track. As long as you're using exclusive
consumer, only one consumer will be actively consuming from the queue at a
time. You could use individual ack, or you could also use transacted
sessions. By not sending the ack back, your consumer will not proceed to
the next message. You could even set prefetch low (0 or 1) so that messages
don't get dispatched to the broker until the consumer has ack'd the
previous message.


On Tue, Aug 6, 2013 at 2:07 AM, retok <re...@profidatagroup.com> wrote:

> We have a use case where there is a single queue that has multiple message
> consumers. It is important that the messages are consumed in
> first-in-first-out order and message consuming must stop when a message
> cannot be acknowledged (e.g. due to an application/database error) and must
> not be resumed until the problem has been solved. Consuming must be resumed
> with the not acknowledged message.
>
> To guarantee ordering when multiple consumers are around, we assume that we
> need to use the exclusive consumer feature of ActiveMQ. To have control
> over
> message acknowledgement, we assume that we need to use
> ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE.
>
> But now, how can we achieve that message consuming is stopping whenever we
> do not acknowledge a message?
> And how can we resume consuming messages after the application/database
> error has been resolved (not knowing which consumer was the exclusive
> consumer)?
>
> None of the consumer needs to die upon an error of the application/database
> behind, so failover is not the topic of this question.
>
> Thanks for any help,
> Reto
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Stop-and-resume-message-consuming-tp4670118.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta