You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/08 02:43:52 UTC
[rocketmq] 09/17: Fix test for consumer offset
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit a767cc142423b62fc50fc21fb50c1b741203af2e
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Jan 5 17:11:31 2022 +0800
Fix test for consumer offset
---
.../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 10 ++++++----
.../org/apache/rocketmq/test/statictopic/StaticTopicIT.java | 3 +++
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 568a728..5b8a19f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1176,6 +1176,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
continue;
}
+ TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
+
{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
@@ -1206,14 +1208,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
// the consumerOffset cannot be zero for static topic because of the "double read check" strategy
// just remain the logic for dynamic topic
// maybe we should remove it in the future
- if (consumerOffset < 0)
- consumerOffset = 0;
+ if (mappingDetail == null) {
+ if (consumerOffset < 0)
+ consumerOffset = 0;
+ }
offsetWrapper.setBrokerOffset(brokerOffset);
offsetWrapper.setConsumerOffset(consumerOffset);
- // the consumeOffset is not in this broker for static topic
- // and may get the wrong result
long timeOffset = consumerOffset - 1;
if (timeOffset >= 0) {
long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index 3e8f146..5b3e5fe 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -292,6 +292,7 @@ public class StaticTopicIT extends BaseConf {
String group = initConsumerGroup();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+ long start = System.currentTimeMillis();
int queueNum = 10;
int msgEachQueue = 100;
@@ -314,6 +315,7 @@ public class StaticTopicIT extends BaseConf {
Assert.assertNotNull(wrapper);
Assert.assertEquals(msgEachQueue, wrapper.getBrokerOffset());
Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+ Assert.assertTrue(wrapper.getLastTimestamp() > start);
}
List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
@@ -332,6 +334,7 @@ public class StaticTopicIT extends BaseConf {
Assert.assertNotNull(wrapper);
Assert.assertEquals(msgEachQueue + brokers.size() * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, wrapper.getBrokerOffset());
Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+ Assert.assertTrue(wrapper.getLastTimestamp() > start);
}
consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());