You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/29 08:20:59 UTC

[rocketmq] 01/02: Add a basic produce test for static topic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit ed2d626720d14859bf1449479d604f1fa72f2554
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Nov 29 15:18:13 2021 +0800

    Add a basic produce test for static topic
---
 .../broker/processor/AdminBrokerProcessor.java     | 21 ++++++++++++------
 .../impl/producer/DefaultMQProducerImpl.java       |  1 +
 .../test/client/rmq/RMQNormalProducer.java         |  9 ++++++++
 .../test/clientinterface/AbstractMQProducer.java   |  1 +
 .../org/apache/rocketmq/test/base/BaseConf.java    |  2 ++
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  | 25 ++++++++++++++++++++++
 .../command/topic/UpdateStaticTopicSubCommand.java |  2 ++
 7 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ab62415..e1c2e30 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -767,14 +767,21 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         try {
             requestHeader.setBname(mappingItem.getBname());
             requestHeader.setPhysical(true);
-            //TODO check if it is leader
-            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
-            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
-            if (rpcResponse.getException() != null) {
-                throw rpcResponse.getException();
+            requestHeader.setQueueId(mappingItem.getQueueId());
+            long physicalOffset;
+            //run in local
+            if (mappingItem.getBname().equals(mappingDetail.getBname())) {
+                physicalOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(mappingDetail.getTopic(), mappingItem.getQueueId());
+            } else {
+                RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
+                RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+                if (rpcResponse.getException() != null) {
+                    throw rpcResponse.getException();
+                }
+                GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
+                physicalOffset = offsetResponseHeader.getOffset();
             }
-            GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
-            long offset = mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
+            long offset = mappingItem.computeStaticQueueOffsetUpToEnd(physicalOffset);
 
             final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
             final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 52a2d9c..bf2ca28 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -725,6 +725,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
         if (null == brokerAddr) {
             tryToFindTopicPublishInfo(mq.getTopic());
+            brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
             brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
         }
 
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
index 6bbec6c..018623d 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -132,6 +132,12 @@ public class RMQNormalProducer extends AbstractMQProducer {
         }
     }
 
+    public void send(int num, MessageQueue mq) {
+        for (int i = 0; i < num; i++) {
+            sendMQ((Message) getMessageByTag(null), mq);
+        }
+    }
+
     public ResultWrapper sendMQ(Message msg, MessageQueue mq) {
         org.apache.rocketmq.client.producer.SendResult metaqResult = null;
         try {
@@ -145,6 +151,9 @@ public class RMQNormalProducer extends AbstractMQProducer {
             sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
             sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
             msgBodys.addData(new String(msg.getBody()));
+            if (originMsgs.getAllData().contains(msg)) {
+                System.out.println("Hash collision");
+            }
             originMsgs.addData(msg);
             originMsgIndex.put(new String(msg.getBody()), metaqResult);
         } catch (Exception e) {
diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java
index df6abfc..1e0a19a 100644
--- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java
@@ -34,6 +34,7 @@ public abstract class AbstractMQProducer extends MQCollector implements MQProduc
     protected String producerInstanceName = null;
     protected boolean isDebug = false;
 
+
     public AbstractMQProducer(String topic) {
         super();
         producerGroupName = RandomUtil.getStringByUUID();
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 2d59f31..e573180 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.client.consumer.MQPullConsumer;
 import org.apache.rocketmq.client.consumer.MQPushConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MQProducer;
 import org.apache.rocketmq.client.producer.TransactionListener;
 import org.apache.rocketmq.common.MQVersion;
@@ -129,6 +130,7 @@ public class BaseConf {
         return mqAdminExt;
     }
 
+
     public static RMQNormalProducer getProducer(String nsAddr, String topic) {
         return getProducer(nsAddr, topic, false);
     }
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index a4fa864..7c3f0e5 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -1,12 +1,15 @@
 package org.apache.rocketmq.test.smoke;
 
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.util.MQRandomUtils;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.junit.After;
@@ -17,6 +20,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -61,6 +65,7 @@ public class StaticTopicIT extends BaseConf {
     @Test
     public void testCreateAndRemappingStaticTopic() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
         int queueNum = 10;
         Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers());
         {
@@ -77,6 +82,26 @@ public class StaticTopicIT extends BaseConf {
             Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
             Assert.assertEquals(queueNum, globalIdMap.size());
         }
+        List<MessageQueue> messageQueueList = producer.getMessageQueue();
+        Assert.assertEquals(queueNum, messageQueueList.size());
+        producer.setDebug(true);
+        for (int i = 0; i < queueNum; i++) {
+            MessageQueue messageQueue = messageQueueList.get(i);
+            Assert.assertEquals(topic, messageQueue.getTopic());
+            Assert.assertEquals(i, messageQueue.getQueueId());
+            Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+        }
+        for(MessageQueue messageQueue: messageQueueList) {
+            producer.send(100, messageQueue);
+        }
+        //leave the time to build the cq
+        Thread.sleep(500);
+        for(MessageQueue messageQueue: messageQueueList) {
+            Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+            Assert.assertEquals(100, defaultMQAdminExt.maxOffset(messageQueue));
+        }
+        Assert.assertEquals(100 * queueNum, producer.getAllOriginMsg().size());
+        Assert.assertEquals(0, producer.getSendErrorMsg().size());
         /*{
             Set<String> targetBrokers = Collections.singleton(broker1Name);
             Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index d6a680a..a32dc8d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -215,6 +215,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
                 System.out.println("The old mapping data is written to file " + oldMappingDataFile);
             }
+            //add the existed brokers to target brokers
+            targetBrokers.addAll(brokerConfigMap.keySet());
 
             //calculate the new data
             TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);