You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/03/07 10:04:16 UTC

[GitHub] [rocketmq] leiy88 opened a new issue #3937: Some ProcessQueue blocked and cleanExpireMessage thread stopped

leiy88 opened a new issue #3937:
URL: https://github.com/apache/rocketmq/issues/3937


   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   - What did you do (The steps to reproduce)?
   Hard to reproduce.
   
   2. Please tell us about your environment:
   - JDK1.8 run on Centos7
   - rocketmq-client version 4.8.0
   - debug with Arthas
   
   4. Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc):
   - Issue
   Some queues consumerOffset will be blocked. Like queue 12 and queue 14
   ![mqp](https://user-images.githubusercontent.com/19524201/157005726-4ab5590a-c892-4574-bd7b-6de9f9a305d7.png)
   - Analyze
   
   PullRequest is keep going, I can find client log like this:
   
   ```
   WARN RocketmqClient - the queue's messages, span
   too long, so do flow control, minOffset=668629, maxOffset=669630,
   maxSpan=1001
   ```
   Debug with Arthas, also keep going and nextOffset is newest
   ```
   ognl
   '@SpringUtils@getBean("xxxSubscriber").consumer.defaultMQPushConsumerImpl.mQClientFacto
   ry.pullMessageService.pullRequestQueue.take()' -c 5674cd4d
   @PullRequest[
     consumerGroup=@String[xxx],
     messageQueue=@MessageQueue[MessageQueue
   [topic=xxx, brokerName=broker3,
   queueId=30]],
     processQueue=@ProcessQueue[org.apache.rocketmq.client.impl.consumer.Proce
   ssQueue@2386cd61],
     nextOffset=@Long[669749],
     lockedFirst=@Boolean[false],
   ]
   ```
   and ConsumerThreadPool not busy
   ```
   ognl
   '@SpringUtils@getBean("xxx").consumer.defaultMQPushConsumerImpl.consumeMessag
   eService.consumeExecutor' -c 5674cd4d
   
    
   consumeExecutor=@ThreadPoolExecutor[java.util.concurrent.ThreadPoolExecutor
   @320f074e[Running, pool size = 20, active threads = 0, queued tasks = 0,
   completed tasks = 744439]]
   
   ``` 
   then I took a look at ProcessQueue, some expired msg was blocked in it, and never will be clean
   ```
   ognl '@SpringUtils@getBean("xxx").consumer.defaultMQPushConsumerImpl.rebalanceImpl.processQueueTable' -c 2f2c9b19 -x 3 > ~/pq
   
   Here is what in the treeMap
    @Long[683527]:@MessageClientExt[MessageExt [queueId=17, storeSize=1426, queueOffset=683527, sysFlag=0, bornTimestamp=1646575741924, bornHost=/192.168.112.9:14371, storeTimestamp=1646575741976, storeHost=/192.168.109.40:10912, msgId=C0A86D2800002AA000000738BECAA2F4, commitLogOffset=7940300514036, bodyCRC=1812790343, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘xxx', flag=0, properties={MIN_OFFSET=679961, MAX_OFFSET=684744, KEYS=998013830434914562, UNIQ_KEY=C0A870095FEF5674CD4D1E808FE4F79C, CLUSTER=Cluster2}
   ```
   Why expired msgs still here? Look at the cleanExpireMsgExecutors, no queued task. It was stopped!
   ```
   ognl '@SpringUtils@getBean("xxx").consumer.defaultMQPushConsumerImpl.consumeMessageService.cleanExpireMsgExecutors' -c 5674cd4d
   @DelegatedScheduledExecutorService[
       e=@ScheduledThreadPoolExecutor[java.util.concurrent.ScheduledThreadPoolExecutor@e419b6a[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 13555]],
       e=@ScheduledThreadPoolExecutor[java.util.concurrent.ScheduledThreadPoolExecutor@e419b6a[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 13555]],
       $assertionsDisabled=@Boolean[true],
   ]
   ```
   So I tried to trigger cleanExpireMessage by Arthas. It worked, after do this consumerOffset will catch up broker offset
   ```
   ognl '@SpringUtils@getBean("newSmsCodeSubscriber").consumer.defaultMQPushConsumerImpl.consumeMessageService.cleanExpireMsg()' -c 5674cd4d
   ```
   But when i do this many times, I met this:
   ```
   Caused by: java.lang.NumberFormatException: null
           at java.lang.Long.parseLong(Long.java:552)
           at java.lang.Long.parseLong(Long.java:631)
           at org.apache.rocketmq.client.impl.consumer.ProcessQueue.cleanExpiredMsg(ProcessQueue.java:87)
           at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.cleanExpireMsg(ConsumeMessageConcurrentlyService.java:270)
   ```
   Look back to the msg in treeMap, there is no CONSUME_START_TIME property!
   So ProcessQueue.cleanExpireMessage will throw NumberFormatException and make the cleanExpireMsgExecutors stop working!
   <img width="1106" alt="mqe" src="https://user-images.githubusercontent.com/19524201/157010049-ebba991f-0d9f-4d74-a0f4-efdde48a8737.png">
   
   
   But I still don't know why ProcessQueue.removeMessage in processConsumeResult not working.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ni-ze commented on issue #3937: Some ProcessQueue blocked and cleanExpireMessage thread stopped

Posted by GitBox <gi...@apache.org>.
ni-ze commented on issue #3937:
URL: https://github.com/apache/rocketmq/issues/3937#issuecomment-1061316820


   ![image](https://user-images.githubusercontent.com/31175234/157148223-acb126b8-011c-44fc-9de6-a2b85241fd97.png)
   This bug is fixed in develop, throwable is caught, clean thread stop because of uncaught error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] leiy88 commented on issue #3937: Some ProcessQueue blocked and cleanExpireMessage thread stopped

Posted by GitBox <gi...@apache.org>.
leiy88 commented on issue #3937:
URL: https://github.com/apache/rocketmq/issues/3937#issuecomment-1061340243


   > @leiy88 It hard to say, You can check whether there is an error in rocketmq_client.log.
   
   Nope,nothing in it. Checked it before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] leiy88 commented on issue #3937: Some ProcessQueue blocked and cleanExpireMessage thread stopped

Posted by GitBox <gi...@apache.org>.
leiy88 commented on issue #3937:
URL: https://github.com/apache/rocketmq/issues/3937#issuecomment-1061335050


   > ![image](https://user-images.githubusercontent.com/31175234/157148223-acb126b8-011c-44fc-9de6-a2b85241fd97.png) This bug is fixed in develop, throwable is caught, clean thread stop because of uncaught error.
   
   👏能帮忙解释一下 removeMsg 在什么情况下会失败导致过期消息卡进度吗?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] aaron-ai commented on issue #3937: Some ProcessQueue blocked and cleanExpireMessage thread stopped

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on issue #3937:
URL: https://github.com/apache/rocketmq/issues/3937#issuecomment-1061340286


   Nice catch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ni-ze commented on issue #3937: Some ProcessQueue blocked and cleanExpireMessage thread stopped

Posted by GitBox <gi...@apache.org>.
ni-ze commented on issue #3937:
URL: https://github.com/apache/rocketmq/issues/3937#issuecomment-1061337859


   @leiy88 It hard to say, You can check whether there is an error in rocketmq_client.log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] chenzlalvin commented on issue #3937: Some ProcessQueue blocked and cleanExpireMessage thread stopped

Posted by GitBox <gi...@apache.org>.
chenzlalvin commented on issue #3937:
URL: https://github.com/apache/rocketmq/issues/3937#issuecomment-1062437626


   Compared with PushConsumer full-managed consumption, RocketMQ should provide a set of consumption mechanisms to actively obtain and submit messages. Like AWS SQS,Asynchronous control can be done more flexibly. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org