You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/01/23 12:25:37 UTC
[GitHub] [rocketmq] d4ksn opened a new issue #2621: Every time an orderly consumer gets started and assigned new queues, consumer delay (20 seconds at max) will happen
d4ksn opened a new issue #2621:
URL: https://github.com/apache/rocketmq/issues/2621
This is the RocketmqClient log my program printed out:
```
[2021-01-23 17:40:48.148] level:INFO thread:RebalanceService class:RocketmqClient msg:the message queue lock OK, my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7] ex:
[2021-01-23 17:40:48.148] level:INFO thread:RebalanceService class:RocketmqClient msg:remove unnecessary messageQueue offset. group=my_consumer_group, mq=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], offsetTableSize=0 ex:
[2021-01-23 17:40:48.161] level:INFO thread:RebalanceService class:RocketmqClient msg:doRebalance, my_consumer_group, add a new mq, MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7] ex:
[2021-01-23 17:40:48.172] level:INFO thread:RebalanceService class:RocketmqClient msg:the message queue lock OK, my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6] ex:
[2021-01-23 17:40:48.172] level:INFO thread:RebalanceService class:RocketmqClient msg:remove unnecessary messageQueue offset. group=my_consumer_group, mq=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], offsetTableSize=1 ex:
[2021-01-23 17:40:48.185] level:INFO thread:RebalanceService class:RocketmqClient msg:doRebalance, my_consumer_group, add a new mq, MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6] ex:
[2021-01-23 17:40:48.185] level:INFO thread:RebalanceService class:RocketmqClient msg:doRebalance, my_consumer_group, add a new pull request PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] ex:
[2021-01-23 17:40:48.185] level:INFO thread:RebalanceService class:RocketmqClient msg:doRebalance, my_consumer_group, add a new pull request PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] ex:
[2021-01-23 17:40:48.185] level:INFO thread:RebalanceService class:RocketmqClient msg:rebalanced result changed. allocateMessageQueueStrategyName=AVG, group=my_consumer_group, topic=my_topic, clientId=192.168.26.229@DefaultRocketMQListenerContainer_4, mqAllSize=8, cidAllSize=3, rebalanceResultSize=2, rebalanceResultSet=[MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6]] ex:
[2021-01-23 17:40:48.185] level:INFO thread:RebalanceService class:RocketmqClient msg:my_topic Rebalance changed, also update version: 1611394846526, 1611394848185 ex:
[2021-01-23 17:40:48.185] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] ex:
[2021-01-23 17:40:48.185] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] ex:
[2021-01-23 17:40:51.186] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] ex:
[2021-01-23 17:40:51.187] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] ex:
[2021-01-23 17:40:54.187] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] ex:
[2021-01-23 17:40:54.188] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] ex:
[2021-01-23 17:40:57.188] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] ex:
[2021-01-23 17:40:57.188] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] ex:
[2021-01-23 17:41:00.189] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] ex:
[2021-01-23 17:41:00.189] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] ex:
[2021-01-23 17:41:03.190] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] ex:
[2021-01-23 17:41:03.190] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] ex:
[2021-01-23 17:41:06.191] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] ex:
[2021-01-23 17:41:06.192] level:INFO thread:PullMessageService class:RocketmqClient msg:pull message later because not locked in broker, PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] ex:
[2021-01-23 17:41:07.997] level:INFO thread:ConsumeMessageScheduledThread_1 class:RocketmqClient msg:the message queue locked OK, Group: my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7] ex:
[2021-01-23 17:41:07.997] level:INFO thread:ConsumeMessageScheduledThread_1 class:RocketmqClient msg:the message queue locked OK, Group: my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6] ex:
[2021-01-23 17:41:09.222] level:INFO thread:PullMessageService class:RocketmqClient msg:the first time to pull message, so fix offset from broker. pullRequest: PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], nextOffset=2542013] NewOffset: 2542013 brokerBusy: false ex:
[2021-01-23 17:41:09.235] level:INFO thread:PullMessageService class:RocketmqClient msg:the first time to pull message, so fix offset from broker. pullRequest: PullRequest [consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], nextOffset=5950778] NewOffset: 5950778 brokerBusy: false ex:
```
As seen from above, at 17:40:48, the two queues already "lock OK" first, and "add a new pull request", but then it said "pull message later because not locked in broker", so it went into a 3-second retry loop (but still failed like before).
Almost 20 seconds later at 17:41:07, it finally said "the message queue locked OK", and started consuming messages normally.
So, in the 17:40:48-17:41:07 time range, the messages in these 2 queues were kept in the broker, not delivered to the consumer, and caused a 20-second consumer delay.
If you look into the code and debug, you will find that:
In the first `RebalanceImpl#lock` invocation, the `processQueueTable` doesn't contain the entry for the new queue yet, so `processQueue.setLocked(true);` is not invoked.
`processQueueTable.putIfAbsent(mq, pq);` is called only after `RebalanceImpl#lock` has returned.
However, besides `RebalanceImpl#lock`, we also invoke `RebalanceImpl#lockAll` periodically, at a 20-second interval by default. So, `processQueue.setLocked(true);` finally get invoked after 20 seconds. But actually, the queue is already locked in the broker 20 seconds before.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org