You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/06/29 08:35:21 UTC

[GitHub] [rocketmq] henrypoter opened a new issue #2127: LitePullConsumer not work

henrypoter opened a new issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127


   see https://github.com/apache/rocketmq/issues/2110


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 removed a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 removed a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653702187


   > > > litePullConsumer
   > > 
   > > 
   > > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   > 
   > Both not work.
   > Below is the workaround:
   > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); consumer.setMessageQueueListener(mql);`
   
   @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       break;
                   default:
                       break;
               }
           }
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653702287


   > > > litePullConsumer
   > > 
   > > 
   > > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   > 
   > Both not work.
   > Below is the workaround:
   > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); consumer.setMessageQueueListener(mql);`
   
   the default MessageQueueListenerImpl like this,can you find some difference
   
   ```
   org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.MessageQueueListenerImpl
   
       class MessageQueueListenerImpl implements MessageQueueListener {
           @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       break;
                   default:
                       break;
               }
           }
       }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-655238278


   > > Testcase to reproduce the problem
   > > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker.
   > 
   > Thanks for your details about this issue, and I read your code listed above, Rocketmq's subscribe() mode does not support custom MessageQueueListener, because in subscribe mode, pull consumer will automatically do load balancing. If you change this, it will not work. If you don't want rocketmq to manage load balancing, assign() mode will be a better choice.
   > 
   > But this setMessageQueueListener() is really best to be hidden from users, are you willing to submit a PR to optimize it?
   
   I think the MessageQueueListener should be expose to the application developer.
   I don't want to manage queue rebalance, but I want to know the queue rebalance event.
   
   The custom messageQueueListener  is called by RebalanceLitePullImpl.messageQueueChanged() not by DefaultLitePullConsumerImpl.
   
   ```
   public class RebalanceLitePullImpl extends RebalanceImpl {
   
       private final DefaultLitePullConsumerImpl litePullConsumerImpl;
   
       //....
   
       @Override
       public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
           MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
           if (messageQueueListener != null) {
               try {
                   messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
               } catch (Throwable e) {
                   log.error("messageQueueChanged exception", e);
               }
           }
       }
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-652970864


   > Testcase to reproduce the problem
   > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > 
   > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker.
   
   3q for provide consumer code, if you can provide topic info , i can try to debug .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-651716608


   [1.log](https://github.com/apache/rocketmq/files/4851265/1.log)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653702287


   > > > litePullConsumer
   > > 
   > > 
   > > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   > 
   > Both not work.
   > Below is the workaround:
   > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); consumer.setMessageQueueListener(mql);`
   
   ```
   org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.MessageQueueListenerImpl
   
       class MessageQueueListenerImpl implements MessageQueueListener {
           @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       break;
                   default:
                       break;
               }
           }
       }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653831053


   in your example,with you own define MessageQueueListener, may be you consumer will not get any MessageQueue。
   
   just some tips for help
   
   ```
   try {
               MessageQueueListener mql = new MessageQueueListener() {
                   @Override
                   public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                       System.out.printf("=============messageQueueChanged topic:%s q:%s %n",topic,JSON.toJSONString(mqDivided));
   
                       System.out.printf("queue assigned to consumer[%s] are: %n", litePullConsumer.getInstanceName());
                       for(MessageQueue q: mqDivided){
                           System.out.printf("%d ",q.getQueueId());
                       }
                       System.out.printf("%nmqAll:%s %n",JSON.toJSONString(mqAll));
                       //litePullConsumer.assign(mqDivided);
                   }
               };
               //当设置了messageQueueListener后,可能再也收不到消息了。。
               //不设置messageQueueListener,可以正常收消息
               //litePullConsumer.setMessageQueueListener(mql);
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653044103


   > litePullConsumer
   
   litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653831053


   in your example, with you own define MessageQueueListener, may be you consumer will not get any MessageQueue for consume。
   
   just some tips for help,you can ref https://www.jianshu.com/p/fc6e4cfe39cb
   
   ```
   try {
               MessageQueueListener mql = new MessageQueueListener() {
                   @Override
                   public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                       System.out.printf("=============messageQueueChanged topic:%s q:%s %n",topic,JSON.toJSONString(mqDivided));
   
                       System.out.printf("queue assigned to consumer[%s] are: %n", litePullConsumer.getInstanceName());
                       for(MessageQueue q: mqDivided){
                           System.out.printf("%d ",q.getQueueId());
                       }
                       System.out.printf("%nmqAll:%s %n",JSON.toJSONString(mqAll));
                       //litePullConsumer.assign(mqDivided);
                   }
               };
               //当设置了messageQueueListener后,可能再也收不到消息了。。
               //不设置messageQueueListener,可以正常收消息
               //litePullConsumer.setMessageQueueListener(mql);
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653702187


   > > > litePullConsumer
   > > 
   > > 
   > > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   > 
   > Both not work.
   > Below is the workaround:
   > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); consumer.setMessageQueueListener(mql);`
   
   @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       break;
                   default:
                       break;
               }
           }
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-651494809


   > @henrypoter would you like to provide more information(logs, env, etc.) about this issue?Did you turn off the automatic creation of topics and subscriptions? If yes, did you manually create it first?
   
   I have turn off the automatic creation of topics, and I created the topic manually.  I confirmed that the consumer group have been created automaticly. The subscription  state is ok.
   
   It may be the offset table is empty caused the issue. 
   
   ## env
   broker 4.6.0 dleger mode
   client 4.6.0,4.6.1,4.7.0 
   
   ## broker log when running the litePullConsumer 
   
   `
   ==> broker.log <==
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - auto create a subscription group, SubscriptionGroupConfig [groupName=middleware-topic1-dr-group, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - create new topic TopicConfig [topicName=%RETRY%middleware-topic1-dr-group, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
   2020-06-30 10:49:44 INFO brokerOutApi_thread_3 - register broker[0]to name server 192.168.11.182:9876 OK
   2020-06-30 10:49:44 INFO brokerOutApi_thread_4 - register broker[0]to name server 192.168.11.181:9876 OK
   2020-06-30 10:49:44 INFO brokerOutApi_thread_2 - register broker[0]to name server 192.168.11.180:9876 OK
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - new consumer connected, group: middleware-topic1-dr-group CONSUME_ACTIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x6f069b95, L:/192.168.11.181:30911 - R:/192.168.11.123:58139], clientId=10.88.0.50@23332, language=JAVA, version=335, lastUpdateTimestamp=1593485384567]
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - subscription changed, add new topic, group: middleware-topic1-dr-group SubscriptionData [classFilterMode=false, topic=middleware-topic, subString=*, tagsSet=[], codeSet=[], subVersion=1593485358956, expressionType=TAG]
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - registerConsumer info changed ConsumerData [groupName=middleware-topic1-dr-group, consumeType=CONSUME_ACTIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_LAST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=middleware-topic, subString=*, tagsSet=[], codeSet=[], subVersion=1593485358956, expressionType=TAG]]] 192.168.11.123:58139`
   
   ##The litePullConsumer can not poll any messages, there are some messages in the topic:
   
   `List<MessageExt> messageExts = litePullConsumer.poll(30000);`
                       
   ##The litePullConsumer offset table is empty:
   ` OffsetStore offsetStore = litePullConsumer.getOffsetStore();
   Map<MessageQueue, Long>  queueOffsetMap =  offsetStore.cloneOffsetTable(topic);`
   
   ## if is use the push consumer with the same consumer group name subscribe the topic, it consumes normally.
   Then I kill the push consumer, run the litePullConsumer again, it works.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 removed a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 removed a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-652970864


   > Testcase to reproduce the problem
   > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > 
   > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker.
   
   3q for provide consumer code, if you can provide topic info , i can try to debug .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-655238278


   > > Testcase to reproduce the problem
   > > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker.
   > 
   > Thanks for your details about this issue, and I read your code listed above, Rocketmq's subscribe() mode does not support custom MessageQueueListener, because in subscribe mode, pull consumer will automatically do load balancing. If you change this, it will not work. If you don't want rocketmq to manage load balancing, assign() mode will be a better choice.
   > 
   > But this setMessageQueueListener() is really best to be hidden from users, are you willing to submit a PR to optimize it?
   
   I think the litePullConsumer should provide the ability to listen  queue rebalance event.
   
   I suggest to hide the messageQueueListener  in litePullConsumer and  add a MessageQueueRebalanceListener to litePullConsumer.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-655238278


   > > Testcase to reproduce the problem
   > > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker.
   > 
   > Thanks for your details about this issue, and I read your code listed above, Rocketmq's subscribe() mode does not support custom MessageQueueListener, because in subscribe mode, pull consumer will automatically do load balancing. If you change this, it will not work. If you don't want rocketmq to manage load balancing, assign() mode will be a better choice.
   > 
   > But this setMessageQueueListener() is really best to be hidden from users, are you willing to submit a PR to optimize it?
   
   I think the litePullConsumer should provide the ability to listen  queue rebalance event.
   
   I suggest to keep the messageQueueListener  in litePullConsumer. And let the orginal messageQueueListener  be a inner default messageQueueListenerInner ,  the RebalanceLitePullImpl call the messageQueueListenerInner (It will never be override by developer). Then the messageQueueListenerInner  call messageQueueListener seted by developer via litePullConsumer.setMessageQueueListener()  
   
   Sth like this: 
   ```
   //DefaultLitePullConsumerImpl.java
   private MessageQueueListener messageQueueListenerInner = new MessageQueueListenerImpl();
   class MessageQueueListenerImpl implements MessageQueueListener {
   
           @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
               MessageQueueListener messageQueueListener = defaultLitePullConsumer.getMessageQueueListener();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       if(null != messageQueueListener){
                           try {
                               messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided);
                           } catch (Throwable e) {
                               log.error("messageQueueChanged exception", e);
                           }
                       }
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       if(null != messageQueueListener){
                           try {
                               messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided);
                           } catch (Throwable e) {
                               log.error("messageQueueChanged exception", e);
                           }
                       }
                       break;
                   default:
                       break;
               }
           }
       }
   
   
   
   //RebalanceLitePullImpl.java
   @Override
       public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
           MessageQueueListener messageQueueListener = this.litePullConsumerImpl./*getDefaultLitePullConsumer().*/getMessageQueueListenerInner();
           if (messageQueueListener != null) {
               try {
                   messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
               } catch (Throwable e) {
                   log.error("messageQueueChanged exception", e);
               }
           }
       }
   
   ```
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-652249813


   Testcase to reproduce the problem 
   [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   
   
   When we call litePullConsumer.setMessageQueueListener(mql)  the consumer cannot poll( ) messages from broker.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653702287


   > > > litePullConsumer
   > > 
   > > 
   > > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   > 
   > Both not work.
   > Below is the workaround:
   > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); consumer.setMessageQueueListener(mql);`
   
   the default MessageQueueListenerImpl like this,can you fund some difference
   
   ```
   org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.MessageQueueListenerImpl
   
       class MessageQueueListenerImpl implements MessageQueueListener {
           @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       break;
                   default:
                       break;
               }
           }
       }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653702287


   > > > litePullConsumer
   > > 
   > > 
   > > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   > 
   > Both not work.
   > Below is the workaround:
   > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); consumer.setMessageQueueListener(mql);`
   
   i found the source code in rocketmq client is like this.
   
   ```
   org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.MessageQueueListenerImpl
   
       class MessageQueueListenerImpl implements MessageQueueListener {
           @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       break;
                   default:
                       break;
               }
           }
       }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653790013


   > > > > litePullConsumer
   > > > 
   > > > 
   > > > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   > > 
   > > 
   > > Both not work.
   > > Below is the workaround:
   > > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); consumer.setMessageQueueListener(mql);`
   > 
   > the default MessageQueueListenerImpl like this,can you find some difference
   > 
   > ```
   > org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.MessageQueueListenerImpl
   > 
   >     class MessageQueueListenerImpl implements MessageQueueListener {
   >         @Override
   >         public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
   >             MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
   >             switch (messageModel) {
   >                 case BROADCASTING:
   >                     updateAssignedMessageQueue(topic, mqAll);
   >                     updatePullTask(topic, mqAll);
   >                     break;
   >                 case CLUSTERING:
   >                     updateAssignedMessageQueue(topic, mqDivided);
   >                     updatePullTask(topic, mqDivided);
   >                     break;
   >                 default:
   >                     break;
   >             }
   >         }
   >     }
   > ```
   
   I custom the MessageQueueListener to monitor the consumer rebalance and reset offset of queues by myself.
   From the client code, the consumer will use the default MessageQueueListenerImpl when subscribe.
   ```
    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
           try {
               if (topic == null || topic.equals("")) {
                   throw new IllegalArgumentException("Topic can not be null or empty.");
               }
               setSubscriptionType(SubscriptionType.SUBSCRIBE);
               SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                   topic, subExpression);
               this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
               this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
               assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
               if (serviceState == ServiceState.RUNNING) {
                   this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                   updateTopicSubscribeInfoWhenSubscriptionChanged();
               }
           } catch (Exception e) {
               throw new MQClientException("subscribe exception", e);
           }
       }
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653278785


   > > litePullConsumer
   > 
   > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   
   Both not work.
   Below is the workaround:
   `consumer.subscribe(topic, "*");
   consumer.start();
   consumer.poll();
   consumer.setMessageQueueListener(mql);`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-655238278


   > > Testcase to reproduce the problem
   > > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker.
   > 
   > Thanks for your details about this issue, and I read your code listed above, Rocketmq's subscribe() mode does not support custom MessageQueueListener, because in subscribe mode, pull consumer will automatically do load balancing. If you change this, it will not work. If you don't want rocketmq to manage load balancing, assign() mode will be a better choice.
   > 
   > But this setMessageQueueListener() is really best to be hidden from users, are you willing to submit a PR to optimize it?
   
   I think the MessageQueueListener should be expose to the application developer.
   I don't want to manage queue rebalance, but I want to know the queue rebalance event.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-655238278


   > > Testcase to reproduce the problem
   > > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker.
   > 
   > Thanks for your details about this issue, and I read your code listed above, Rocketmq's subscribe() mode does not support custom MessageQueueListener, because in subscribe mode, pull consumer will automatically do load balancing. If you change this, it will not work. If you don't want rocketmq to manage load balancing, assign() mode will be a better choice.
   > 
   > But this setMessageQueueListener() is really best to be hidden from users, are you willing to submit a PR to optimize it?
   
   I think the litePullConsumer should provide the ability to listen  queue rebalance event.
   
   I suggest to keep the messageQueueListener  in litePullConsumer. And let the original messageQueueListener  be a inner default messageQueueListenerInner ,  the RebalanceLitePullImpl call the messageQueueListenerInner (It will never be overrided by developer). Then the messageQueueListenerInner  call messageQueueListener seted by developer via litePullConsumer.setMessageQueueListener()  
   
   Sth like this: 
   ```
   //DefaultLitePullConsumerImpl.java
   private MessageQueueListener messageQueueListenerInner = new MessageQueueListenerImpl();
   class MessageQueueListenerImpl implements MessageQueueListener {
   
           @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
               MessageQueueListener messageQueueListener = defaultLitePullConsumer.getMessageQueueListener();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       if(null != messageQueueListener){
                           try {
                               messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided);
                           } catch (Throwable e) {
                               log.error("messageQueueChanged exception", e);
                           }
                       }
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       if(null != messageQueueListener){
                           try {
                               messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided);
                           } catch (Throwable e) {
                               log.error("messageQueueChanged exception", e);
                           }
                       }
                       break;
                   default:
                       break;
               }
           }
       }
   
   
   
   //RebalanceLitePullImpl.java
   @Override
       public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
           MessageQueueListener messageQueueListener = this.litePullConsumerImpl./*getDefaultLitePullConsumer().*/getMessageQueueListenerInner();
           if (messageQueueListener != null) {
               try {
                   messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
               } catch (Throwable e) {
                   log.error("messageQueueChanged exception", e);
               }
           }
       }
   
   ```
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653278785


   > > litePullConsumer
   > 
   > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   
   Both not work.
   Below is the workaround:
   `
   consumer.subscribe(topic, "*");
   consumer.start();
   consumer.poll();
   consumer.setMessageQueueListener(mql);
   `
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-651494809


   > @henrypoter would you like to provide more information(logs, env, etc.) about this issue?Did you turn off the automatic creation of topics and subscriptions? If yes, did you manually create it first?
   
   I have turn off the automatic creation of topics, and I created the topic manually.  I confirmed that the consumer group have been created automaticly. The subscription  state is ok.
   
   It may be the offset table is empty caused the issue. 
   
   ## env
   broker 4.6.0 dleger mode
   client 4.6.0,4.6.1,4.7.0 
   
   ## broker log when running the litePullConsumer 
   
   `
   ==> broker.log <==
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - auto create a subscription group, SubscriptionGroupConfig [groupName=middleware-topic1-dr-group, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - create new topic TopicConfig [topicName=%RETRY%middleware-topic1-dr-group, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
   2020-06-30 10:49:44 INFO brokerOutApi_thread_3 - register broker[0]to name server 192.168.11.182:9876 OK
   2020-06-30 10:49:44 INFO brokerOutApi_thread_4 - register broker[0]to name server 192.168.11.181:9876 OK
   2020-06-30 10:49:44 INFO brokerOutApi_thread_2 - register broker[0]to name server 192.168.11.180:9876 OK
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - new consumer connected, group: middleware-topic1-dr-group CONSUME_ACTIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x6f069b95, L:/192.168.11.181:30911 - R:/192.168.11.123:58139], clientId=10.88.0.50@23332, language=JAVA, version=335, lastUpdateTimestamp=1593485384567]
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - subscription changed, add new topic, group: middleware-topic1-dr-group SubscriptionData [classFilterMode=false, topic=middleware-topic, subString=*, tagsSet=[], codeSet=[], subVersion=1593485358956, expressionType=TAG]
   2020-06-30 10:49:44 INFO HeartbeatThread_3 - registerConsumer info changed ConsumerData [groupName=middleware-topic1-dr-group, consumeType=CONSUME_ACTIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_LAST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=middleware-topic, subString=*, tagsSet=[], codeSet=[], subVersion=1593485358956, expressionType=TAG]]] 192.168.11.123:58139`
   
   ## The litePullConsumer can not poll any messages, there are some messages in the topic:
   
   `List<MessageExt> messageExts = litePullConsumer.poll(30000);`
                       
   ##The litePullConsumer offset table is empty:
   ` OffsetStore offsetStore = litePullConsumer.getOffsetStore();
   Map<MessageQueue, Long>  queueOffsetMap =  offsetStore.cloneOffsetTable(topic);`
   
   ##  if I use the push consumer with the same consumer group name to subscribe the topic, it consumes normally.
   Then I kill the push consumer, run the litePullConsumer again, it works.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] duhenglucky commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-651467337


   @henrypoter would you like to provide more information(logs, env, etc.) about this issue?Did you turn off the automatic creation of topics and subscriptions? If yes, did you manually create it first?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653278785


   > > litePullConsumer
   > 
   > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   
   Both not work
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653831053


   in your example, with you own define MessageQueueListener, may be your。consumer will not get any MessageQueue for consume。
   
   just some tips for help,you can ref https://www.jianshu.com/p/fc6e4cfe39cb
   
   ```
   try {
               MessageQueueListener mql = new MessageQueueListener() {
                   @Override
                   public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                       System.out.printf("=============messageQueueChanged topic:%s q:%s %n",topic,JSON.toJSONString(mqDivided));
   
                       System.out.printf("queue assigned to consumer[%s] are: %n", litePullConsumer.getInstanceName());
                       for(MessageQueue q: mqDivided){
                           System.out.printf("%d ",q.getQueueId());
                       }
                       System.out.printf("%nmqAll:%s %n",JSON.toJSONString(mqAll));
                       //litePullConsumer.assign(mqDivided);
                   }
               };
               //当设置了messageQueueListener后,可能再也收不到消息了。。
               //不设置messageQueueListener,可以正常收消息
               //litePullConsumer.setMessageQueueListener(mql);
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] duhenglucky commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653968175


   > Testcase to reproduce the problem
   > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > 
   > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker.
   
   Thanks for your details about this issue, and I read your code listed above, Rocketmq's subscribe() mode does not support custom MessageQueueListener, because in subscribe mode, pull consumer will automatically do load balancing. If you change this, it will not work. If you don't want rocketmq to manage load balancing, assign() mode will be a better choice.
   
   But this setMessageQueueListener() is really best to be hidden from users, are you willing to submit a PR to optimize it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] henrypoter commented on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
henrypoter commented on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653790013


   > > > > litePullConsumer
   > > > 
   > > > 
   > > > litePullConsumer.start() should after litePullConsumer.setMessageQueueListener(mql)???
   > > 
   > > 
   > > Both not work.
   > > Below is the workaround:
   > > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); consumer.setMessageQueueListener(mql);`
   > 
   > the default MessageQueueListenerImpl like this,can you find some difference
   > 
   > ```
   > org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.MessageQueueListenerImpl
   > 
   >     class MessageQueueListenerImpl implements MessageQueueListener {
   >         @Override
   >         public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
   >             MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
   >             switch (messageModel) {
   >                 case BROADCASTING:
   >                     updateAssignedMessageQueue(topic, mqAll);
   >                     updatePullTask(topic, mqAll);
   >                     break;
   >                 case CLUSTERING:
   >                     updateAssignedMessageQueue(topic, mqDivided);
   >                     updatePullTask(topic, mqDivided);
   >                     break;
   >                 default:
   >                     break;
   >             }
   >         }
   >     }
   > ```
   
   I custom the MessageQueueListener to monitor the consumer rebalance and reset offset of queues by myself.
   From the client code, the consumer will will use the default MessageQueueListenerImpl.
   ```
    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
           try {
               if (topic == null || topic.equals("")) {
                   throw new IllegalArgumentException("Topic can not be null or empty.");
               }
               setSubscriptionType(SubscriptionType.SUBSCRIBE);
               SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                   topic, subExpression);
               this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
               this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
               assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
               if (serviceState == ServiceState.RUNNING) {
                   this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                   updateTopicSubscribeInfoWhenSubscriptionChanged();
               }
           } catch (Exception e) {
               throw new MQClientException("subscribe exception", e);
           }
       }
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] lebron374 edited a comment on issue #2127: LitePullConsumer not work

Posted by GitBox <gi...@apache.org>.
lebron374 edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653831053


   in your example, with you own define MessageQueueListener, may be your consumer will not get any MessageQueue for consume。
   
   just some tips for help,you can ref https://www.jianshu.com/p/fc6e4cfe39cb
   
   ```
   try {
               MessageQueueListener mql = new MessageQueueListener() {
                   @Override
                   public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                       System.out.printf("=============messageQueueChanged topic:%s q:%s %n",topic,JSON.toJSONString(mqDivided));
   
                       System.out.printf("queue assigned to consumer[%s] are: %n", litePullConsumer.getInstanceName());
                       for(MessageQueue q: mqDivided){
                           System.out.printf("%d ",q.getQueueId());
                       }
                       System.out.printf("%nmqAll:%s %n",JSON.toJSONString(mqAll));
                       //litePullConsumer.assign(mqDivided);
                   }
               };
               //当设置了messageQueueListener后,可能再也收不到消息了。。
               //不设置messageQueueListener,可以正常收消息
               //litePullConsumer.setMessageQueueListener(mql);
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org