You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Eugene <eu...@gmail.com> on 2011/11/21 15:13:30 UTC

ActiveMQ Dispatcher Policy

Hey,

Small question here : suppose I have two Consumers (DMLC in Spring - started
in two different application contexts - two Java VMs), both "listen" to the
same Queue.

Now I want to keep the DMLC Container started, but at the same time I want
to be able to disable one of them. And when I say disable I mean not receive
messages at all in the Queue - so stop() from DMLC will not work. Also
shutdown will not work either because it will shutdown the DMLC and the idea
is that the process should be manageable from JMX - so stopping it is not
really an option.

My idea was this - which actually works:

Extend the DMLC Container and add a flag : "disableMessageConsuming" - true
or false:

public class CustomDefaultMessageListenerContainer extends
DefaultMessageListenerContainer {

    private boolean disableMessageConsuming;

    public boolean isDisableMessageConsuming() {
        return disableMessageConsuming;
    }

    public void setDisableMessageConsuming(boolean disableMessageConsuming)
{
        this.disableMessageConsuming = disableMessageConsuming;
       
((MyMessageListener)super.getMessageListener()).setDisableMessageConsuming(disableMessageConsuming);
    }
}

Of course the same variable is present in MyMessageListener, which is just a
class that implements MessageListener - nothing fancy.

Inside MyMessageListener I just check the flag and if it's set to true and
because of the ACKNOWLEDGMENT the original message is already gone from the
Queue, so I recreate another one with the same MsgID and payload.

But this looks way too hacky to me - and probably wrong.

What I wanted to do is check the flag and throw and error, so that the
message get rolled back to the Queue. This, again, happens, BUT it seems
that ActiveMQ somehow registers it's Consumers to a message and thus this
already rolled back message can not really be taken by some other consumer.

Is there a way to unregister a message from a particular consumer?

Thx,
Eugene. 

--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dispatcher-Policy-tp4091599p4091599.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply:Re: Reply:Re: Reply:Re: Reply:ActiveMQ Dispatcher Policy

Posted by SuoNayi <su...@163.com>.
Because in this case the DMLC is shut down normally.Messages will be acked before consumers are closed.
It seems that you have to program to implement the mechanism you want by youself.




At 2011-11-22 21:17:27,Eugene <eu...@gmail.com> wrote:
>Yup - don't know why I said shut down the JVM. My bad. 
>What I wanted to say is that even if you shutdown/destroy the DMLC - the
>message is NOT consumed by the other DMLC. IT IS Consumed only if I kill the
>JVM that the now shutdown/destroyed DMLC was running in.
>I just re-tested this.
>
>Eugene. 
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dispatcher-Policy-tp4091599p4095619.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Reply:Re: Reply:Re: Reply:ActiveMQ Dispatcher Policy

Posted by Eugene <eu...@gmail.com>.
Yup - don't know why I said shut down the JVM. My bad. 
What I wanted to say is that even if you shutdown/destroy the DMLC - the
message is NOT consumed by the other DMLC. IT IS Consumed only if I kill the
JVM that the now shutdown/destroyed DMLC was running in.
I just re-tested this.

Eugene. 

--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dispatcher-Policy-tp4091599p4095619.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply:Re: Reply:Re: Reply:ActiveMQ Dispatcher Policy

Posted by SuoNayi <su...@163.com>.
stop on the DMLC will pause the consumers (not close)and stop the transport of the connection,not exit the entire JVM.

shutdown on the DMLC will close all consumers and the underlying connection as well,not exit the entire JVM too.
note that the messageListener is referenced all the time.when you call start or initialize the DMLC, the messageListener will be used directly.
These behaves can be crontrolled via jmx as well,because no matter which action you choses the JVM is alive all the time.


At 2011-11-22 16:36:10,Eugene <eu...@gmail.com> wrote:
>stop is not really an option. stop will stop the MessageListener and not the
>actual DMLC.
>Restart either - if you call shutdown, I loose the entire JVM, thus no
>controll after that with JMX.
>So, my solution is to actually re-send the message to the Queue once they
>end-up in the Listener that is not supposed to take them. From there you can
>add an additional header for example and check it, so that you do not really
>keep sending the same message for infinite times to the same Listener (if
>other are not started).
>
>Cheers,
>Eugene.
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dispatcher-Policy-tp4091599p4094882.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Reply:Re: Reply:ActiveMQ Dispatcher Policy

Posted by Eugene <eu...@gmail.com>.
stop is not really an option. stop will stop the MessageListener and not the
actual DMLC.
Restart either - if you call shutdown, I loose the entire JVM, thus no
controll after that with JMX.
So, my solution is to actually re-send the message to the Queue once they
end-up in the Listener that is not supposed to take them. From there you can
add an additional header for example and check it, so that you do not really
keep sending the same message for infinite times to the same Listener (if
other are not started).

Cheers,
Eugene.

--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dispatcher-Policy-tp4091599p4094882.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply:Re: Reply:ActiveMQ Dispatcher Policy

Posted by SuoNayi <su...@163.com>.
As you said, you could stop the DMLC which implements interface Lifecycle or restart it via jmx command.




At 2011-11-22 02:03:35,Eugene <eu...@gmail.com> wrote:
>So what your are trying to say is what I have mentioned: run-time this is not
>possible. You have to manually shut down the DMLC, which kind-of sucks :(
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dispatcher-Policy-tp4091599p4092470.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Reply:ActiveMQ Dispatcher Policy

Posted by Eugene <eu...@gmail.com>.
So what your are trying to say is what I have mentioned: run-time this is not
possible. You have to manually shut down the DMLC, which kind-of sucks :(

--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dispatcher-Policy-tp4091599p4092470.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply:ActiveMQ Dispatcher Policy

Posted by SuoNayi <su...@163.com>.
The broker will dispatch messages to consumers in batches before client's ack, then the client will dispatch each message to consumer who is interesting in.
Messages that process unsuccessfully, says messageListener throws RuntimeException, will be redispatched to messageLisener by client if not out of the max redelivery numbers or client will tell broker the message is a poison message and will be deliveyed to DLQ.
When the consumer is closed the broker will redispatch its unacked messages to other active consumers.
Hope this will be helpful to you.




At 2011-11-21 22:13:30,Eugene <eu...@gmail.com> wrote:
>Hey,
>
>Small question here : suppose I have two Consumers (DMLC in Spring - started
>in two different application contexts - two Java VMs), both "listen" to the
>same Queue.
>
>Now I want to keep the DMLC Container started, but at the same time I want
>to be able to disable one of them. And when I say disable I mean not receive
>messages at all in the Queue - so stop() from DMLC will not work. Also
>shutdown will not work either because it will shutdown the DMLC and the idea
>is that the process should be manageable from JMX - so stopping it is not
>really an option.
>
>My idea was this - which actually works:
>
>Extend the DMLC Container and add a flag : "disableMessageConsuming" - true
>or false:
>
>public class CustomDefaultMessageListenerContainer extends
>DefaultMessageListenerContainer {
>
>    private boolean disableMessageConsuming;
>
>    public boolean isDisableMessageConsuming() {
>        return disableMessageConsuming;
>    }
>
>    public void setDisableMessageConsuming(boolean disableMessageConsuming)
>{
>        this.disableMessageConsuming = disableMessageConsuming;
>       
>((MyMessageListener)super.getMessageListener()).setDisableMessageConsuming(disableMessageConsuming);
>    }
>}
>
>Of course the same variable is present in MyMessageListener, which is just a
>class that implements MessageListener - nothing fancy.
>
>Inside MyMessageListener I just check the flag and if it's set to true and
>because of the ACKNOWLEDGMENT the original message is already gone from the
>Queue, so I recreate another one with the same MsgID and payload.
>
>But this looks way too hacky to me - and probably wrong.
>
>What I wanted to do is check the flag and throw and error, so that the
>message get rolled back to the Queue. This, again, happens, BUT it seems
>that ActiveMQ somehow registers it's Consumers to a message and thus this
>already rolled back message can not really be taken by some other consumer.
>
>Is there a way to unregister a message from a particular consumer?
>
>Thx,
>Eugene. 
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dispatcher-Policy-tp4091599p4091599.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.