You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@rocketmq.apache.org by Adam Warski <ad...@warski.org> on 2017/10/25 07:46:34 UTC

Multiple consumers for the same topic

Hello,

I have two consumers running (on different hosts), trying to consume messages from a single topic, but only one of them receives messages.
Here's how my consumer is configured:

consumer = new DefaultMQPushConsumer("my_consumer_group")
consumer.setNamesrvAddr(nameServers)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET)
consumer.setMessageModel(MessageModel.CLUSTERING)
consumer.subscribe("my_topic", "*")
consumer.registerMessageListener(new MessageListenerConcurrently {
  ...
})

The configuration is identical on all consumers.

Is there some configuration on the topic that needs to be done so that the messages are load-balanced evenly between the consumers?

Thanks,
Adam

-- 
Adam Warski
http://www.softwaremill.com
http://twitter.com/#!/adamwarski


Re: Multiple consumers for the same topic

Posted by Adam Warski <ad...@warski.org>.
Hello,

sorry for the late reply, I got quite significantly side-tracked :) And thanks for the answer!

> If all your messages were delivered to the first queue specified by your producers, for sure only one consumer may consume messages.

Hm, I'm sending all messages to a single topic: mqperf. Does that mean only a single consumer will receive the messages? That would explain the behaviour I am seeing.

But you also wrote:

> No. If the message model is CLUSTERING, messages are automatically load-balanced among consumers of the same consumer group. There is nothing more to do.

which would suggest that if there are multiple consumers for a given topic, all consumers will receive the same number of messages?

Here's the output of the commands you advised running:

[ec2-user@rocketmq_broker_master bin]$ ./mqadmin consumerConnection -g mqperf_consumer_group -n 172.22.1.197:9876
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
001  172.22.1.35@instance594          172.22.1.35:57196      JAVA     V4_1_0_SNAPSHOT
002  172.22.1.33@instance667          172.22.1.33:56618      JAVA     V4_1_0_SNAPSHOT

Below is subscription:001  Topic: mqperf                                   SubExpression: *
002  Topic: %RETRY%mqperf_consumer_group             SubExpression: *
ConsumeType: CONSUME_PASSIVELY
MessageModel: CLUSTERING
ConsumeFromWhere: CONSUME_FROM_FIRST_OFFSET

[ec2-user@rocketmq_broker_master bin]$ ./mqadmin consumerProgress -g mqperf_consumer_group -n 172.22.1.197:9876
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
#Topic                            #Broker Name                      #QID  #Broker Offset        #Consumer Offset      #Client IP           #Diff                 #LastTime
%RETRY%mqperf_consumer_group      mqperf                            0     0                     0                     172.22.1.33          0                     1970-01-01 00:00:00
mqperf                            mqperf                            0     2539542               2276359               172.22.1.33          263183                2017-11-14 12:34:39

Consume TPS: 3343.25
Diff Total: 263183
[ec2-user@rocketmq_broker_master bin]$

So there are two consumers (as there should be), but only one is making progress? Any ideas why that could be?

By the way, here's the full code I'm using for sending/receiving messages: https://github.com/softwaremill/mqperf/blob/master/src/main/scala/com/softwaremill/mqperf/mq/RocketMq.scala

Thanks!
Adam

-- 
Adam Warski
http://www.softwaremill.com
http://twitter.com/#!/adamwarski


Re: Multiple consumers for the same topic

Posted by Zhanhui Li <li...@gmail.com>.
Hi Adam,

> "Is there some configuration on the topic that needs to be done so that the messages are load-balanced evenly between the consumers?”

No. If the message model is CLUSTERING, messages are automatically load-balanced among consumers of the same consumer group. There is nothing more to do.

To diagnose the problem, you may use the mqadmin tool as below

bash mqadmin consumerConnection -g CG_QuickStart -n localhost:9876

Where CG_QuickStart should be replaced with your consumer group name and localhost:9876 should be your name server address list;

This command would output  something as below



You should see all your clients listed here. IP@PID is used to identify each consumer per consumer group. If there were one consumer missing from this list, you should check reasons why the other one fails to connect. Network connectivity? Wrong name server list? Incorrect consumer group name? Code gotcha(forget to call start() method after configuring)?

If all your consumers are here, you may use the following command 

bash mqadmin consumerProgress -g CG_QuickStart -n localhost:9876

where  CG_QuickStart and localhost:9876 should be replaced with your values as described above.

Sample output is as below:



Broker Offset: How many messages are put into this queue;
Consumer Offset: How many messages have already been consumed;
Client IP: IP of the client who is currently responsible for consuming this queue;
Diff: Number of messages yet to consume by the consumer;
LastTime: Known latest time this queue is consumed

If all your messages were delivered to the first queue specified by your producers, for sure only one consumer may consume messages.

If you may observe some queues are allocated to your second client and some messages are pending to consume and your second client still stay idle, do not hesitate to file a bug. 

Zhanhui Li

> 在 2017年10月25日,下午3:46,Adam Warski <ad...@warski.org> 写道:
> 
> Hello,
> 
> I have two consumers running (on different hosts), trying to consume messages from a single topic, but only one of them receives messages.
> Here's how my consumer is configured:
> 
> consumer = new DefaultMQPushConsumer("my_consumer_group")
> consumer.setNamesrvAddr(nameServers)
> consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET)
> consumer.setMessageModel(MessageModel.CLUSTERING)
> consumer.subscribe("my_topic", "*")
> consumer.registerMessageListener(new MessageListenerConcurrently {
>   ...
> })
> 
> The configuration is identical on all consumers.
> 
> Is there some configuration on the topic that needs to be done so that the messages are load-balanced evenly between the consumers?
> 
> Thanks,
> Adam
> 
> -- 
> Adam Warski
> http://www.softwaremill.com <http://www.softwaremill.com/>
> http://twitter.com/#!/adamwarski
>