You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/28 05:58:29 UTC
[rocketmq] branch develop updated: [ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer (#6197)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 88000dac2 [ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer (#6197)
88000dac2 is described below
commit 88000dac212a0c721a99ca167132920ce0f45f1b
Author: rongtong <ji...@163.com>
AuthorDate: Tue Feb 28 13:58:21 2023 +0800
[ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer (#6197)
* Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer
* Add lastConsumeTimestamp and lastPullTimestamp in consumerRunningInfo for DefaultLitePullConsumer
* Pass the check style
---
.../client/impl/consumer/AssignedMessageQueue.java | 4 ---
.../impl/consumer/DefaultLitePullConsumerImpl.java | 31 +++++++++++++++++-----
2 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index 5f89c3c6c..a57cb53b4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -37,10 +37,6 @@ public class AssignedMessageQueue {
this.rebalanceImpl = rebalanceImpl;
}
- public Set<MessageQueue> messageQueues() {
- return assignedMessageQueueState.keySet();
- }
-
public boolean isPaused(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index c6631cb5e..e5aed64d3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -69,6 +69,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@@ -158,7 +159,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
// only for test purpose, will be modified by reflection in unit test.
- @SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
+ @SuppressWarnings("FieldMayBeFinal")
+ private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
@@ -394,7 +396,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
// If assign function invoke before start function, then update pull task after initialization.
if (subscriptionType == SubscriptionType.ASSIGN) {
- updateAssignPullTask(assignedMessageQueue.messageQueues());
+ updateAssignPullTask(assignedMessageQueue.getAssignedMessageQueues());
}
for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
@@ -484,12 +486,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
/**
* subscribe data by customizing messageQueueListener
+ *
* @param topic
* @param subExpression
* @param messageQueueListener
* @throws MQClientException
*/
- public synchronized void subscribe(String topic, String subExpression, MessageQueueListener messageQueueListener) throws MQClientException {
+ public synchronized void subscribe(String topic, String subExpression,
+ MessageQueueListener messageQueueListener) throws MQClientException {
try {
if (StringUtils.isEmpty(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty.");
@@ -516,7 +520,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
-
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
if (topic == null || "".equals(topic)) {
@@ -637,6 +640,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
+ consumeRequest.getProcessQueue().setLastConsumeTimestamp(System.currentTimeMillis());
return messages;
}
} catch (InterruptedException ignore) {
@@ -655,7 +659,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
- if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
+ if (!assignedMessageQueue.getAssignedMessageQueues().contains(messageQueue)) {
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
throw new MQClientException("The message queue is not in assigned list, may be rebalancing, message queue: " + messageQueue, null);
} else {
@@ -721,7 +725,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public synchronized void commitAll() {
- for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
+ for (MessageQueue messageQueue : assignedMessageQueue.getAssignedMessageQueues()) {
try {
commit(messageQueue);
} catch (Exception e) {
@@ -732,6 +736,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
/**
* Specify offset commit
+ *
* @param messageQueues
* @param persist
*/
@@ -760,6 +765,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
/**
* Get the queue assigned in subscribe mode
+ *
* @return
*/
public synchronized Set<MessageQueue> assignment() {
@@ -895,6 +901,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return;
}
+ processQueue.setLastPullTimestamp(System.currentTimeMillis());
+
if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
@@ -1172,6 +1180,15 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
info.setProperties(prop);
info.getSubscriptionSet().addAll(this.subscriptions());
+
+ for (MessageQueue mq : this.assignedMessageQueue.getAssignedMessageQueues()) {
+ ProcessQueue pq = this.assignedMessageQueue.getProcessQueue(mq);
+ ProcessQueueInfo pqInfo = new ProcessQueueInfo();
+ pqInfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
+ pq.fillProcessQueueInfo(pqInfo);
+ info.getMqTable().put(mq, pqInfo);
+ }
+
return info;
}
@@ -1234,7 +1251,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public synchronized void registerTopicMessageQueueChangeListener(String topic,
- TopicMessageQueueChangeListener listener) throws MQClientException {
+ TopicMessageQueueChangeListener listener) throws MQClientException {
if (topic == null || listener == null) {
throw new MQClientException("Topic or listener is null", null);
}