You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/03 07:34:57 UTC
[rocketmq] 02/03: Add test for logicOffset = -1
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit a71ee3b979aaa626934bee390707c03461cc9def
Author: dongeforever <do...@apache.org>
AuthorDate: Fri Dec 3 15:23:28 2021 +0800
Add test for logicOffset = -1
---
.../broker/processor/SendMessageProcessor.java | 4 +-
.../rocketmq/test/util/MQAdminTestUtils.java | 45 ++++++++++++
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 80 ++++++++++++++++++++++
3 files changed, 128 insertions(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index bdcba39..17f19cb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -137,7 +137,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
//no need to care the broker name
long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
+ //if the logic offset is -1, just let it go
+ //maybe we need a dynamic config
+ //return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
responseHeader.setQueueId(mappingContext.getGlobalId());
responseHeader.setQueueOffset(staticLogicOffset);
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 9de6205..7eaf6a0 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -23,13 +23,17 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.CommandUtil;
@@ -187,4 +191,45 @@ public class MQAdminTestUtils {
MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
}
+ //for test only
+ public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ assert !brokerConfigMap.isEmpty();
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
+ MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
+ remappingStaticTopicWithNegativeLogicOffset(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
+ }
+
+ //for test only
+ public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+ ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt);
+ MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
+ // now do the remapping
+ //Step1: let the new leader can be write without the logicOffset
+ for (String broker: brokersToMapIn) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+ }
+ //Step2: forbid the write of old leader
+ for (String broker: brokersToMapOut) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+ }
+
+ //Step5: write the non-target brokers
+ for (String broker: brokerConfigMap.keySet()) {
+ if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
+ continue;
+ }
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+ }
+ }
+
+
+
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index aeddef6..9b348dd 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -52,6 +52,7 @@ public class StaticTopicIT extends BaseConf {
defaultMQAdminExt.start();
}
+
@Test
public void testNoTargetBrokers() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
@@ -357,6 +358,85 @@ public class StaticTopicIT extends BaseConf {
}
}
+ @Test
+ public void testRemappingWithNegativeLogicOffset() throws Exception {
+ String topic = "static" + MQRandomUtils.getRandomTopic();
+ RMQNormalProducer producer = getProducer(nsAddr, topic);
+ int queueNum = 10;
+ int msgEachQueue = 100;
+ //create static topic
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker1Name);
+ MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+ }
+ //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
+ //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
+
+ //produce the messages
+ {
+ List<MessageQueue> messageQueueList = producer.getMessageQueue();
+ for (int i = 0; i < queueNum; i++) {
+ MessageQueue messageQueue = messageQueueList.get(i);
+ Assert.assertEquals(i, messageQueue.getQueueId());
+ Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+ }
+ for(MessageQueue messageQueue: messageQueueList) {
+ producer.send(msgEachQueue, messageQueue);
+ }
+ Assert.assertEquals(0, producer.getSendErrorMsg().size());
+ //leave the time to build the cq
+ Thread.sleep(100);
+ for(MessageQueue messageQueue: messageQueueList) {
+ //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+ Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
+ }
+ }
+
+ //remapping the static topic with -1 logic offset
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker2Name);
+ MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt);
+ Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+
+ TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+ Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+ Assert.assertEquals(queueNum, globalIdMap.size());
+ for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
+ Assert.assertEquals(broker2Name, mappingOne.getBname());
+ Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
+ }
+ }
+ //leave the time to refresh the metadata
+ Thread.sleep(500);
+ producer.setDebug(true);
+ {
+ ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
+ List<MessageQueue> messageQueueList = producer.getMessageQueue();
+ for (int i = 0; i < queueNum; i++) {
+ MessageQueue messageQueue = messageQueueList.get(i);
+ Assert.assertEquals(i, messageQueue.getQueueId());
+ String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
+ Assert.assertEquals(destBrokerName, broker2Name);
+ }
+
+ for(MessageQueue messageQueue: messageQueueList) {
+ producer.send(msgEachQueue, messageQueue);
+ }
+ Assert.assertEquals(0, producer.getSendErrorMsg().size());
+ Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size());
+ //leave the time to build the cq
+ Thread.sleep(100);
+ for(MessageQueue messageQueue: messageQueueList) {
+ Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+ //the max offset should still be msgEachQueue
+ Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
+ }
+ }
+ }
+
+
@After
public void tearDown() {
System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");