You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/07/14 02:15:48 UTC

[GitHub] [rocketmq] ozw999 opened a new issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

ozw999 opened a new issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143


   **BUG REPORT**
   
   1. Please describe the issue you observed:
   I found a problem :
   When a network exception occurs on my cluster(2m-2s), the task( `updateTopicRouteInfoFromNameServer()` ) was not performed on time.
   Why `MQClientInstance` used a `Executors.newSingleThreadScheduledExecutor()` to do lots of things? such as 
   > ```
   > updateTopicRouteInfoFromNameServer();
   > sendHeartbeatToAllBrokerWithLock();
   > persistAllConsumerOffset();
   >```
   Through my observation, when a network exception occurred while executing `persistAllConsumerOffset()` :
   ```
       @Override
       public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
           MQBrokerException, InterruptedException, MQClientException {
           FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
           if (null == findBrokerResult) {
   
               this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
               findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
           }
   
           if (findBrokerResult != null) {
               UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
               requestHeader.setTopic(mq.getTopic());
               requestHeader.setConsumerGroup(this.groupName);
               requestHeader.setQueueId(mq.getQueueId());
               requestHeader.setCommitOffset(offset);
   
               if (isOneway) {
                   this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                       findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
               } else {
                   this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                       findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
               }
           } else {
               throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
           }
       }
   ```
   At this moment, `this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());` will not be null,because used single-thread, task `updateTopicRouteInfoFromNameServer();` is waiting.
   In fact  the `findBrokerResult` doesn't exist anymore, every queue will cost 5 seconds in `findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);`. 
   In the meanwhile, consumer cannot execute  the `rebalance` correctly, because topicRouteInfo is wrong.
   
   2. Please tell us about your environment:
   RocketMQ version is 4.4.0.
   
   3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
   Why `MQClientInstance` used a `Executors.newSingleThreadScheduledExecutor()` to do lots of things? 
   Whether multiple threads can be used to execute them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] caigy commented on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
caigy commented on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-883127813


   There're 5 tasks in `org.apache.rocketmq.client.impl.factory.MQClientInstance#scheduledExecutorService` and they're executed in a 'sequential' way just as described in question. After reading related code, I found that those tasks would read and write some shared fields, including but not limited to:
   
   - `org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable`
   - `org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteTable`
   - `persistAllConsumerOffset()` also calling `updateTopicRouteInfoFromNameServer()`
   - etc.
   
   More precise work should be spent on how to make them work concurrently in a safe and efficient way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ozw999 commented on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
ozw999 commented on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-882162513


   > So, and what's your advices ?
   
   When you want to speed up disaster recovery switching,you will find that the problem is holding you back.
   Your MQ will not work properly for two minutes.
   My suggestion, is used another thread to do `persistAllConsumerOffset()`
   **But I need to find out why the developers designed it in this way.**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] caigy commented on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
caigy commented on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-883127813


   There're 5 tasks in `org.apache.rocketmq.client.impl.factory.MQClientInstance#scheduledExecutorService` and they're executed in a 'sequential' way just as described in question. After reading related code, I found that those tasks would read and write some shared fields, including but not limited to:
   
   - `org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable`
   - `org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteTable`
   - `persistAllConsumerOffset()` also calling `updateTopicRouteInfoFromNameServer()`
   - etc.
   
   More precise work should be spent on how to make them work concurrently in a safe and efficient way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] zongtanghu commented on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
zongtanghu commented on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-882071981


   So, and what's  your advices ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] caigy commented on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
caigy commented on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-883127813


   There're 5 tasks in `org.apache.rocketmq.client.impl.factory.MQClientInstance#scheduledExecutorService` and they're executed in a 'sequential' way just as described in question. After reading related code, I found that those tasks would read and write some shared fields, including but not limited to:
   
   - `org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable`
   - `org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteTable`
   - `persistAllConsumerOffset()` also calling `updateTopicRouteInfoFromNameServer()`
   - etc.
   
   More precise work should be spent on how to make them work concurrently in a safe and efficient way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lizhimins commented on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
lizhimins commented on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-884859696


   从代码看确实会这样,单次位点提交失败的造成的最坏结果是少量消息冲突,如果使用异步的方式并发提交位点是不是不会有这个问题


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ozw999 edited a comment on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
ozw999 edited a comment on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-885331059


   是的。所有的消费持久化最后都会依次通过 `org.apache.rocketmq.remoting.netty#invokeSync()` 去执行,且超时时间是5s。这将造成非常恐怖的阻塞。
   ```
       if (isOneway) {
           this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
           findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
           } else {
               this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
               findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
           }
   ```
   改用异步调用似乎是一个更好的解决方案。 `org.apache.rocketmq.remoting.netty#invokeAsync()`
   @duhenglucky 我觉得这不仅仅是一个”question“而已呀


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ozw999 commented on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
ozw999 commented on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-885331059


   是的。所有的消费持久化最后都会依次通过 `org.apache.rocketmq.remoting.netty#invokeSync()` 去执行,且超时时间是5s。这将造成非常恐怖的阻塞。
   ```
       if (isOneway) {
           this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
           findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
           } else {
               this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
               findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
           }
   ```
   改用异步调用似乎是一个更好的解决方案。 `org.apache.rocketmq.remoting.netty#invokeAsync()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ozw999 commented on issue #3143: MQClientInstance has a risk of blocking during persistAllConsumerOffset()

Posted by GitBox <gi...@apache.org>.
ozw999 commented on issue #3143:
URL: https://github.com/apache/rocketmq/issues/3143#issuecomment-884776782


   I think that maybe my English is not good enough to express myself fully.
   Forgive me for using Chinese next.
   
   @caigy 
   > * `org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable`
   > * `org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteTable`
   
   以上两个map是 `ConcurrentMap` ,应该不存在并发问题。
   > * `persistAllConsumerOffset()` also calling `updateTopicRouteInfoFromNameServer()`
   
   我想你指的是:
   ```
       FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
       if (null == findBrokerResult) {
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
           findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
       }
   ```
   达成这个条件的前提时当前brokerAddrTable内的数据是正确的。比如在执行`persistAllConsumerOffset()`时若是broker-b正好出现网络故障,此时由于没有去更细brokerAddrTable,findBrokerResult仍然能取到值,从而不会进入if条件。
   
   我现在遇到的问题是,我在使用双主双从的RocketMQ集群做容灾测试的时候发现,在拔掉一台服务器的网线后(A主和B从),MQ大约需要两分钟以后才能够恢复正常的生产和消费。这在企业级应用中是无法容忍的,因此我尝试缩短了一些源码中的超时判断时间,例如:
   * org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#BROKER_CHANNEL_EXPIRED_TIME
   * org.apache.rocketmq.broker.client.ProducerManager#CHANNEL_EXPIRED_TIMEOUT
   * org.apache.rocketmq.broker.client.ConsumerManager#CHANNEL_EXPIRED_TIMEOUT
   等等
   
   但是接踵而来的问题是我发现容灾恢复的时间忽快忽慢,于是我增加了一些调试信息找到了导致MQ恢复慢的问题主要在于:
   `MQClientInstance`里的单线程池在执行定时任务时如果正好执行到`persistAllConsumerOffset()`时发生了服务器断网,由于我有大量的queue他需要一个一个的去持久化,会导致其他定时任务,例如`updateTopicRouteInfoFromNameServer()`在很长的时间内(大约2分钟)不会去执行。在此期间consumer的rebalance无法正确执行,从而不能恢复消费。
   **源码中的这种执行方式很大程度上限制了RocketMQ的容灾恢复速度。**
   
   所以,如果`MQClientInstance`中的这几个定时任务能够拆分执行的话我建议把`persistAllConsumerOffset()`拆出来单独执行。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org