You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/10/28 04:27:29 UTC

[GitHub] [rocketmq] mm23504570 opened a new issue #2378: `NullPointerException` when Consumer shutdown

mm23504570 opened a new issue #2378:
URL: https://github.com/apache/rocketmq/issues/2378


   **BUG REPORT**
   
   1. Please describe the issue you observed:
   `NullPointerException` when Consumer shutdown and Console invoke consumerRunningInfo.
   
   2. Please tell us about your environment:
   Rocketmq version is 4.7.1 and 4.5.2
   3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
   
   logs: 
   ```
   2020-10-28 12:04:05,005 ERROR RocketmqRemoting - process request exception
   java.lang.NullPointerException
           at org.apache.rocketmq.client.impl.factory.MQClientInstance.consumerRunningInfo(MQClientInstance.java:1213)
           at org.apache.rocketmq.client.impl.ClientRemotingProcessor.getConsumerRunningInfo(ClientRemotingProcessor.java:185)
           at org.apache.rocketmq.client.impl.ClientRemotingProcessor.processRequest(ClientRemotingProcessor.java:81)
           at org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor.asyncProcessRequest(AsyncNettyRequestProcessor.java:26)
           at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:225)
           at org.apache.rocketmq.remoting.netty.RequestTask.run(RequestTask.java:80)
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
           at java.util.concurrent.FutureTask.run(FutureTask.java)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   
   ```
   
   demo 
   
   ``` java
   public class Consumer {
   
       public static void main(String[] args) throws InterruptedException, MQClientException, RemotingException {
           new Thread(new Runnable() {
               @Override
               public void run() {
                   while (true){
                       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
                       String namesrvAddr = "10.2.0.6:9876";
                       consumer.setNamesrvAddr(namesrvAddr);
                       consumer.setInstanceName("v2");
                       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                       try {
                           consumer.subscribe("TopicTest", "*");
                       } catch (MQClientException e) {
                           e.printStackTrace();
                       }
                       consumer.registerMessageListener(new MessageListenerConcurrently() {
   
                           @Override
                           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                           ConsumeConcurrentlyContext context) {
                               System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                           }
                       });
                       try {
                           consumer.start();
                       } catch (MQClientException e) {
                           e.printStackTrace();
                       }
                       try {
                           Thread.sleep(1000);
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                       consumer.shutdown();
                   }
               }
           }).start();
   
           DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
           defaultMQAdminExt.setNamesrvAddr("10.2.0.6:9876");
           defaultMQAdminExt.setInstanceName("another");
           defaultMQAdminExt.start();
           new Thread(new Runnable() {
               @Override
               public void run() {
                   while (true) {
   
                       try {
                           ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo("please_rename_unique_group_name_4", "10.5.129.69@v2", true);
                       } catch (RemotingException e) {
                       } catch (MQClientException e) {
                       } catch (InterruptedException e) {
                       }
                   }
               }
           }).start();
           Thread.sleep(100000);
       }
   }
   
   ```
   suggestion: 
   ``` java
      public class ClientRemotingProcessor{ 
   
      public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {
           MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
           // validate
           if(mqConsumerInner == null) return null;
       
           ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();
   .....
        }
   .....
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org