You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/10/28 04:27:29 UTC
[GitHub] [rocketmq] mm23504570 opened a new issue #2378: `NullPointerException` when Consumer shutdown
mm23504570 opened a new issue #2378:
URL: https://github.com/apache/rocketmq/issues/2378
**BUG REPORT**
1. Please describe the issue you observed:
`NullPointerException` when Consumer shutdown and Console invoke consumerRunningInfo.
2. Please tell us about your environment:
Rocketmq version is 4.7.1 and 4.5.2
3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
logs:
```
2020-10-28 12:04:05,005 ERROR RocketmqRemoting - process request exception
java.lang.NullPointerException
at org.apache.rocketmq.client.impl.factory.MQClientInstance.consumerRunningInfo(MQClientInstance.java:1213)
at org.apache.rocketmq.client.impl.ClientRemotingProcessor.getConsumerRunningInfo(ClientRemotingProcessor.java:185)
at org.apache.rocketmq.client.impl.ClientRemotingProcessor.processRequest(ClientRemotingProcessor.java:81)
at org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor.asyncProcessRequest(AsyncNettyRequestProcessor.java:26)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:225)
at org.apache.rocketmq.remoting.netty.RequestTask.run(RequestTask.java:80)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
demo
``` java
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException, RemotingException {
new Thread(new Runnable() {
@Override
public void run() {
while (true){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
String namesrvAddr = "10.2.0.6:9876";
consumer.setNamesrvAddr(namesrvAddr);
consumer.setInstanceName("v2");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
try {
consumer.subscribe("TopicTest", "*");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.shutdown();
}
}
}).start();
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setNamesrvAddr("10.2.0.6:9876");
defaultMQAdminExt.setInstanceName("another");
defaultMQAdminExt.start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo("please_rename_unique_group_name_4", "10.5.129.69@v2", true);
} catch (RemotingException e) {
} catch (MQClientException e) {
} catch (InterruptedException e) {
}
}
}
}).start();
Thread.sleep(100000);
}
}
```
suggestion:
``` java
public class ClientRemotingProcessor{
public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
// validate
if(mqConsumerInner == null) return null;
ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();
.....
}
.....
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org