You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/03 07:34:57 UTC

[rocketmq] 02/03: Add test for logicOffset = -1

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit a71ee3b979aaa626934bee390707c03461cc9def
Author: dongeforever <do...@apache.org>
AuthorDate: Fri Dec 3 15:23:28 2021 +0800

    Add test for logicOffset = -1
---
 .../broker/processor/SendMessageProcessor.java     |  4 +-
 .../rocketmq/test/util/MQAdminTestUtils.java       | 45 ++++++++++++
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  | 80 ++++++++++++++++++++++
 3 files changed, 128 insertions(+), 1 deletion(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index bdcba39..17f19cb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -137,7 +137,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             //no need to care the broker name
             long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset());
             if (staticLogicOffset < 0) {
-                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
+                //if the logic offset is -1, just let it go
+                //maybe we need a dynamic config
+                //return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
             }
             responseHeader.setQueueId(mappingContext.getGlobalId());
             responseHeader.setQueueOffset(staticLogicOffset);
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 9de6205..7eaf6a0 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -23,13 +23,17 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ForkJoinPool;
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.MQAdminUtils;
 import org.apache.rocketmq.tools.command.CommandUtil;
@@ -187,4 +191,45 @@ public class MQAdminTestUtils {
         MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
     }
 
+    //for test only
+    public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+        assert !brokerConfigMap.isEmpty();
+        TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
+        MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
+        remappingStaticTopicWithNegativeLogicOffset(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
+    }
+
+    //for test only
+    public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+        ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt);
+        MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
+        // now do the remapping
+        //Step1: let the new leader can be write without the logicOffset
+        for (String broker: brokersToMapIn) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+        //Step2: forbid the write of old leader
+        for (String broker: brokersToMapOut) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+
+        //Step5: write the non-target brokers
+        for (String broker: brokerConfigMap.keySet()) {
+            if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
+                continue;
+            }
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+    }
+
+
+
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index aeddef6..9b348dd 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -52,6 +52,7 @@ public class StaticTopicIT extends BaseConf {
         defaultMQAdminExt.start();
     }
 
+
     @Test
     public void testNoTargetBrokers() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
@@ -357,6 +358,85 @@ public class StaticTopicIT extends BaseConf {
         }
     }
 
+    @Test
+    public void testRemappingWithNegativeLogicOffset() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+        int queueNum = 10;
+        int msgEachQueue = 100;
+        //create static topic
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker1Name);
+            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+        }
+        //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
+        //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
+
+        //produce the messages
+        {
+            List<MessageQueue> messageQueueList = producer.getMessageQueue();
+            for (int i = 0; i < queueNum; i++) {
+                MessageQueue messageQueue = messageQueueList.get(i);
+                Assert.assertEquals(i, messageQueue.getQueueId());
+                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+            }
+            for(MessageQueue messageQueue: messageQueueList) {
+                producer.send(msgEachQueue, messageQueue);
+            }
+            Assert.assertEquals(0, producer.getSendErrorMsg().size());
+            //leave the time to build the cq
+            Thread.sleep(100);
+            for(MessageQueue messageQueue: messageQueueList) {
+                //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
+            }
+        }
+
+        //remapping the static topic with -1 logic offset
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker2Name);
+            MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt);
+            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+
+            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+            Assert.assertEquals(queueNum, globalIdMap.size());
+            for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
+                Assert.assertEquals(broker2Name, mappingOne.getBname());
+                Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
+            }
+        }
+        //leave the time to refresh the metadata
+        Thread.sleep(500);
+        producer.setDebug(true);
+        {
+            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
+            List<MessageQueue> messageQueueList = producer.getMessageQueue();
+            for (int i = 0; i < queueNum; i++) {
+                MessageQueue messageQueue = messageQueueList.get(i);
+                Assert.assertEquals(i, messageQueue.getQueueId());
+                String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
+                Assert.assertEquals(destBrokerName, broker2Name);
+            }
+
+            for(MessageQueue messageQueue: messageQueueList) {
+                producer.send(msgEachQueue, messageQueue);
+            }
+            Assert.assertEquals(0, producer.getSendErrorMsg().size());
+            Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size());
+            //leave the time to build the cq
+            Thread.sleep(100);
+            for(MessageQueue messageQueue: messageQueueList) {
+                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+                //the max offset should still be msgEachQueue
+                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
+            }
+        }
+    }
+
+
     @After
     public void tearDown() {
         System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");