You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/28 05:58:29 UTC

[rocketmq] branch develop updated: [ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer (#6197)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 88000dac2 [ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer (#6197)
88000dac2 is described below

commit 88000dac212a0c721a99ca167132920ce0f45f1b
Author: rongtong <ji...@163.com>
AuthorDate: Tue Feb 28 13:58:21 2023 +0800

    [ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer (#6197)
    
    * Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer
    
    * Add lastConsumeTimestamp and lastPullTimestamp in consumerRunningInfo for DefaultLitePullConsumer
    
    * Pass the check style
---
 .../client/impl/consumer/AssignedMessageQueue.java |  4 ---
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 31 +++++++++++++++++-----
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index 5f89c3c6c..a57cb53b4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -37,10 +37,6 @@ public class AssignedMessageQueue {
         this.rebalanceImpl = rebalanceImpl;
     }
 
-    public Set<MessageQueue> messageQueues() {
-        return assignedMessageQueueState.keySet();
-    }
-
     public boolean isPaused(MessageQueue messageQueue) {
         MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
         if (messageQueueState != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index c6631cb5e..e5aed64d3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -69,6 +69,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
 import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@@ -158,7 +159,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
 
     // only for test purpose, will be modified by reflection in unit test.
-    @SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
+    @SuppressWarnings("FieldMayBeFinal")
+    private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
 
     public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
         this.defaultLitePullConsumer = defaultLitePullConsumer;
@@ -394,7 +396,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         }
         // If assign function invoke before start function, then update pull task after initialization.
         if (subscriptionType == SubscriptionType.ASSIGN) {
-            updateAssignPullTask(assignedMessageQueue.messageQueues());
+            updateAssignPullTask(assignedMessageQueue.getAssignedMessageQueues());
         }
 
         for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
@@ -484,12 +486,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     /**
      * subscribe data by customizing messageQueueListener
+     *
      * @param topic
      * @param subExpression
      * @param messageQueueListener
      * @throws MQClientException
      */
-    public synchronized void subscribe(String topic, String subExpression, MessageQueueListener messageQueueListener) throws MQClientException {
+    public synchronized void subscribe(String topic, String subExpression,
+        MessageQueueListener messageQueueListener) throws MQClientException {
         try {
             if (StringUtils.isEmpty(topic)) {
                 throw new IllegalArgumentException("Topic can not be null or empty.");
@@ -516,7 +520,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         }
     }
 
-
     public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
         try {
             if (topic == null || "".equals(topic)) {
@@ -637,6 +640,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     consumeMessageContext.setSuccess(true);
                     this.executeHookAfter(consumeMessageContext);
                 }
+                consumeRequest.getProcessQueue().setLastConsumeTimestamp(System.currentTimeMillis());
                 return messages;
             }
         } catch (InterruptedException ignore) {
@@ -655,7 +659,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     }
 
     public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
-        if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
+        if (!assignedMessageQueue.getAssignedMessageQueues().contains(messageQueue)) {
             if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                 throw new MQClientException("The message queue is not in assigned list, may be rebalancing, message queue: " + messageQueue, null);
             } else {
@@ -721,7 +725,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     }
 
     public synchronized void commitAll() {
-        for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
+        for (MessageQueue messageQueue : assignedMessageQueue.getAssignedMessageQueues()) {
             try {
                 commit(messageQueue);
             } catch (Exception e) {
@@ -732,6 +736,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     /**
      * Specify offset commit
+     *
      * @param messageQueues
      * @param persist
      */
@@ -760,6 +765,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     /**
      * Get the queue assigned in subscribe mode
+     *
      * @return
      */
     public synchronized Set<MessageQueue> assignment() {
@@ -895,6 +901,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     return;
                 }
 
+                processQueue.setLastPullTimestamp(System.currentTimeMillis());
+
                 if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
                     scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
@@ -1172,6 +1180,15 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         info.setProperties(prop);
 
         info.getSubscriptionSet().addAll(this.subscriptions());
+
+        for (MessageQueue mq : this.assignedMessageQueue.getAssignedMessageQueues()) {
+            ProcessQueue pq = this.assignedMessageQueue.getProcessQueue(mq);
+            ProcessQueueInfo pqInfo = new ProcessQueueInfo();
+            pqInfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
+            pq.fillProcessQueueInfo(pqInfo);
+            info.getMqTable().put(mq, pqInfo);
+        }
+
         return info;
     }
 
@@ -1234,7 +1251,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     }
 
     public synchronized void registerTopicMessageQueueChangeListener(String topic,
-                                                                     TopicMessageQueueChangeListener listener) throws MQClientException {
+        TopicMessageQueueChangeListener listener) throws MQClientException {
         if (topic == null || listener == null) {
             throw new MQClientException("Topic or listener is null", null);
         }