You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2021/12/20 12:08:03 UTC

[rocketmq] branch develop updated: [ISSUE #1097] Fix null pointer problem when consumption start time is null (#1098)

This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e425c09  [ISSUE #1097] Fix null pointer problem when consumption start time is null (#1098)
e425c09 is described below

commit e425c097ad6ab2e31786fc343805356363ddcf79
Author: ssssssnake <15...@qq.com>
AuthorDate: Mon Dec 20 20:07:45 2021 +0800

    [ISSUE #1097] Fix null pointer problem when consumption start time is null (#1098)
---
 .../apache/rocketmq/client/impl/consumer/ProcessQueue.java   | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 21798d8..ba00aae 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -85,10 +87,14 @@ public class ProcessQueue {
             try {
                 this.treeMapLock.readLock().lockInterruptibly();
                 try {
-                    if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
-                        msg = msgTreeMap.firstEntry().getValue();
+                    if (!msgTreeMap.isEmpty()) {
+                        String consumeStartTimeStamp = MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue());
+                        if (StringUtils.isNotEmpty(consumeStartTimeStamp) && System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
+                            msg = msgTreeMap.firstEntry().getValue();
+                        } else {
+                            break;
+                        }
                     } else {
-
                         break;
                     }
                 } finally {