You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/01/23 12:25:37 UTC

[GitHub] [rocketmq] d4ksn opened a new issue #2621: Every time an orderly consumer gets started and assigned new queues, consumer delay (20 seconds at max) will happen

d4ksn opened a new issue #2621:
URL: https://github.com/apache/rocketmq/issues/2621


   This is the RocketmqClient log my program printed out:
   ```
   [2021-01-23 17:40:48.148]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:the message queue lock OK, my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7]	ex:
   [2021-01-23 17:40:48.148]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:remove unnecessary messageQueue offset. group=my_consumer_group, mq=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], offsetTableSize=0	ex:
   [2021-01-23 17:40:48.161]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:doRebalance, my_consumer_group, add a new mq, MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7]	ex:
   [2021-01-23 17:40:48.172]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:the message queue lock OK, my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6]	ex:
   [2021-01-23 17:40:48.172]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:remove unnecessary messageQueue offset. group=my_consumer_group, mq=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], offsetTableSize=1	ex:
   [2021-01-23 17:40:48.185]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:doRebalance, my_consumer_group, add a new mq, MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6]	ex:
   [2021-01-23 17:40:48.185]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:doRebalance, my_consumer_group, add a new pull request PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013]	ex:
   [2021-01-23 17:40:48.185]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:doRebalance, my_consumer_group, add a new pull request PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778]	ex:
   [2021-01-23 17:40:48.185]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:rebalanced result changed. allocateMessageQueueStrategyName=AVG, group=my_consumer_group, topic=my_topic, clientId=192.168.26.229@DefaultRocketMQListenerContainer_4, mqAllSize=8, cidAllSize=3, rebalanceResultSize=2, rebalanceResultSet=[MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6]]	ex:
   [2021-01-23 17:40:48.185]	level:INFO		thread:RebalanceService	class:RocketmqClient		msg:my_topic Rebalance changed, also update version: 1611394846526, 1611394848185	ex:
   [2021-01-23 17:40:48.185]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013]	ex:
   [2021-01-23 17:40:48.185]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778]	ex:
   [2021-01-23 17:40:51.186]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013]	ex:
   [2021-01-23 17:40:51.187]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778]	ex:
   [2021-01-23 17:40:54.187]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013]	ex:
   [2021-01-23 17:40:54.188]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778]	ex:
   [2021-01-23 17:40:57.188]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013]	ex:
   [2021-01-23 17:40:57.188]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778]	ex:
   [2021-01-23 17:41:00.189]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013]	ex:
   [2021-01-23 17:41:00.189]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778]	ex:
   [2021-01-23 17:41:03.190]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013]	ex:
   [2021-01-23 17:41:03.190]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778]	ex:
   [2021-01-23 17:41:06.191]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013]	ex:
   [2021-01-23 17:41:06.192]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778]	ex:
   [2021-01-23 17:41:07.997]	level:INFO		thread:ConsumeMessageScheduledThread_1	class:RocketmqClient		msg:the message queue locked OK, Group: my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7]	ex:
   [2021-01-23 17:41:07.997]	level:INFO		thread:ConsumeMessageScheduledThread_1	class:RocketmqClient		msg:the message queue locked OK, Group: my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6]	ex:
   [2021-01-23 17:41:09.222]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:the first time to pull message, so fix offset from broker. pullRequest: PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] NewOffset: 2542013 brokerBusy: false	ex:
   [2021-01-23 17:41:09.235]	level:INFO		thread:PullMessageService	class:RocketmqClient		msg:the first time to pull message, so fix offset from broker. pullRequest: PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] NewOffset: 5950778 brokerBusy: false	ex:
   ```
   
   As seen from above, at 17:40:48, the two queues already "lock OK" first, and "add a new pull request", but then it said "pull message later because not locked in broker", so it went into a 3-second retry loop (but still failed like before).
   Almost 20 seconds later at 17:41:07, it finally said "the message queue locked OK", and started consuming messages normally.
   So, in the 17:40:48-17:41:07 time range, the messages in these 2 queues were kept in the broker, not delivered to the consumer, and caused a 20-second consumer delay.
   
   If you look into the code and debug, you will find that:
   In the first `RebalanceImpl#lock` invocation, the `processQueueTable` doesn't contain the entry for the new queue yet, so `processQueue.setLocked(true);` is not invoked.
   `processQueueTable.putIfAbsent(mq, pq);` is called only after `RebalanceImpl#lock` has returned.
   However, besides `RebalanceImpl#lock`, we also invoke `RebalanceImpl#lockAll` periodically, at a 20-second interval by default. So, `processQueue.setLocked(true);` finally get invoked after 20 seconds. But actually, the queue is already locked in the broker 20 seconds before.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org