You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/08 02:43:52 UTC

[rocketmq] 09/17: Fix test for consumer offset

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit a767cc142423b62fc50fc21fb50c1b741203af2e
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Jan 5 17:11:31 2022 +0800

    Fix test for consumer offset
---
 .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 10 ++++++----
 .../org/apache/rocketmq/test/statictopic/StaticTopicIT.java    |  3 +++
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 568a728..5b8a19f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1176,6 +1176,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 continue;
             }
 
+            TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
+
             {
                 SubscriptionData findSubscriptionData =
                     this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
@@ -1206,14 +1208,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 // the consumerOffset cannot be zero for static topic because of the "double read check" strategy
                 // just remain the logic for dynamic topic
                 // maybe we should remove it in the future
-                if (consumerOffset < 0)
-                    consumerOffset = 0;
+                if (mappingDetail == null) {
+                    if (consumerOffset < 0)
+                        consumerOffset = 0;
+                }
 
                 offsetWrapper.setBrokerOffset(brokerOffset);
                 offsetWrapper.setConsumerOffset(consumerOffset);
 
-                // the consumeOffset is not in this broker for static topic
-                // and may get the wrong result
                 long timeOffset = consumerOffset - 1;
                 if (timeOffset >= 0) {
                     long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index 3e8f146..5b3e5fe 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -292,6 +292,7 @@ public class StaticTopicIT extends BaseConf {
         String group = initConsumerGroup();
         RMQNormalProducer producer = getProducer(nsAddr, topic);
         RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+        long start = System.currentTimeMillis();
 
         int queueNum = 10;
         int msgEachQueue = 100;
@@ -314,6 +315,7 @@ public class StaticTopicIT extends BaseConf {
             Assert.assertNotNull(wrapper);
             Assert.assertEquals(msgEachQueue, wrapper.getBrokerOffset());
             Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+            Assert.assertTrue(wrapper.getLastTimestamp() > start);
         }
 
         List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
@@ -332,6 +334,7 @@ public class StaticTopicIT extends BaseConf {
             Assert.assertNotNull(wrapper);
             Assert.assertEquals(msgEachQueue + brokers.size() * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, wrapper.getBrokerOffset());
             Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+            Assert.assertTrue(wrapper.getLastTimestamp() > start);
         }
         consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
         consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());