You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2021/12/20 12:08:03 UTC
[rocketmq] branch develop updated: [ISSUE #1097] Fix null pointer problem when consumption start time is null (#1098)
This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e425c09 [ISSUE #1097] Fix null pointer problem when consumption start time is null (#1098)
e425c09 is described below
commit e425c097ad6ab2e31786fc343805356363ddcf79
Author: ssssssnake <15...@qq.com>
AuthorDate: Mon Dec 20 20:07:45 2021 +0800
[ISSUE #1097] Fix null pointer problem when consumption start time is null (#1098)
---
.../apache/rocketmq/client/impl/consumer/ProcessQueue.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 21798d8..ba00aae 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
@@ -85,10 +87,14 @@ public class ProcessQueue {
try {
this.treeMapLock.readLock().lockInterruptibly();
try {
- if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
- msg = msgTreeMap.firstEntry().getValue();
+ if (!msgTreeMap.isEmpty()) {
+ String consumeStartTimeStamp = MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue());
+ if (StringUtils.isNotEmpty(consumeStartTimeStamp) && System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
+ msg = msgTreeMap.firstEntry().getValue();
+ } else {
+ break;
+ }
} else {
-
break;
}
} finally {