You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/24 09:31:19 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the test for createStaticTopic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new cb7e032  Finish the test for createStaticTopic
cb7e032 is described below

commit cb7e032fe38453eeb3dc966b029b15103f56b4fa
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 24 17:30:35 2021 +0800

    Finish the test for createStaticTopic
---
 .../common/statictopic/LogicQueueMappingItem.java  | 37 ++++++++++++++++
 .../statictopic/TopicConfigAndQueueMapping.java    | 24 +++++++++++
 .../statictopic/TopicQueueMappingDetail.java       | 23 ++++++++++
 .../common/statictopic/TopicQueueMappingInfo.java  | 30 +++++++++++++
 .../common/statictopic/TopicQueueMappingTest.java  |  2 +
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  | 49 +++++++++++++++-------
 6 files changed, 150 insertions(+), 15 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 16f41e5..959207e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -1,5 +1,7 @@
 package org.apache.rocketmq.common.statictopic;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class LogicQueueMappingItem extends RemotingSerializable {
@@ -135,6 +137,41 @@ public class LogicQueueMappingItem extends RemotingSerializable {
         this.startOffset = startOffset;
     }
 
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+
+        if (!(o instanceof LogicQueueMappingItem)) return false;
+
+        LogicQueueMappingItem item = (LogicQueueMappingItem) o;
+
+        return new EqualsBuilder()
+                .append(gen, item.gen)
+                .append(queueId, item.queueId)
+                .append(logicOffset, item.logicOffset)
+                .append(startOffset, item.startOffset)
+                .append(endOffset, item.endOffset)
+                .append(timeOfStart, item.timeOfStart)
+                .append(timeOfEnd, item.timeOfEnd)
+                .append(bname, item.bname)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+                .append(gen)
+                .append(queueId)
+                .append(bname)
+                .append(logicOffset)
+                .append(startOffset)
+                .append(endOffset)
+                .append(timeOfStart)
+                .append(timeOfEnd)
+                .toHashCode();
+    }
+
     @Override
     public String toString() {
         return "LogicQueueMappingItem{" +
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java
index af891d0..cef6418 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.common.statictopic;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.rocketmq.common.TopicConfig;
 
 public class TopicConfigAndQueueMapping extends TopicConfig {
@@ -36,4 +38,26 @@ public class TopicConfigAndQueueMapping extends TopicConfig {
     public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
         this.mappingDetail = mappingDetail;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+
+        if (!(o instanceof TopicConfigAndQueueMapping)) return false;
+
+        TopicConfigAndQueueMapping that = (TopicConfigAndQueueMapping) o;
+
+        return new EqualsBuilder()
+                .appendSuper(super.equals(o))
+                .append(mappingDetail, that.mappingDetail)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+                .appendSuper(super.hashCode())
+                .append(mappingDetail)
+                .toHashCode();
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 4a8bae3..30db209 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -16,6 +16,9 @@
  */
 package org.apache.rocketmq.common.statictopic;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -124,4 +127,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
     public void setHostedQueues(ConcurrentMap<Integer, List<LogicQueueMappingItem>> hostedQueues) {
         this.hostedQueues = hostedQueues;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+
+        if (!(o instanceof TopicQueueMappingDetail)) return false;
+
+        TopicQueueMappingDetail that = (TopicQueueMappingDetail) o;
+
+        return new EqualsBuilder()
+                .append(hostedQueues, that.hostedQueues)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+                .append(hostedQueues)
+                .toHashCode();
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index 7636fd5..53041aa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.common.statictopic;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -93,5 +95,33 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
         this.currIdMap = currIdMap;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
 
+        if (!(o instanceof TopicQueueMappingInfo)) return false;
+
+        TopicQueueMappingInfo info = (TopicQueueMappingInfo) o;
+
+        return new EqualsBuilder()
+                .append(totalQueues, info.totalQueues)
+                .append(epoch, info.epoch)
+                .append(dirty, info.dirty)
+                .append(topic, info.topic)
+                .append(bname, info.bname)
+                .append(currIdMap, info.currIdMap)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+                .append(topic)
+                .append(totalQueues)
+                .append(bname)
+                .append(epoch)
+                .append(dirty)
+                .append(currIdMap)
+                .toHashCode();
+    }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index d571f65..9b9ab6b 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -31,6 +31,7 @@ public class TopicQueueMappingTest {
         //test the decode encode
         {
             LogicQueueMappingItem mappingItemFromJson = RemotingSerializable.fromJson(mappingItemJson, LogicQueueMappingItem.class);
+            Assert.assertEquals(mappingItem, mappingItemFromJson);
             Assert.assertEquals(mappingItemJson, RemotingSerializable.toJson(mappingItemFromJson, false));
         }
         TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
@@ -48,6 +49,7 @@ public class TopicQueueMappingTest {
             TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
             Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().size());
             Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().get(0).size());
+            Assert.assertEquals(mappingItem, mappingDetailFromJson.getHostedQueues().get(0).get(0));
             Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false));
         }
     }
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index bb21dc2..73215ba 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.util.MQRandomUtils;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -16,6 +17,8 @@ import org.junit.FixMethodOrder;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -42,32 +45,48 @@ public class StaticTopicIT extends BaseConf {
         clientMetadata.refreshClusterInfo(clusterInfo);
     }
 
+    public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+        Assert.assertTrue(brokerConfigMap.isEmpty());
+        TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+        //If some succeed, and others fail, it will cause inconsistent data
+        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+            String broker = entry.getKey();
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = entry.getValue();
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
+        }
+        return brokerConfigMap;
+    }
+
     @Test
-    public void testCreateStaticTopic() throws Exception {
+    public void testCreateAndRemappingStaticTopic() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
         int queueNum = 10;
-        Set<String> brokers = getBrokers();
-        //create topic
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = createStaticTopic(topic, queueNum, getBrokers());
         {
-            Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
-            Assert.assertTrue(brokerConfigMap.isEmpty());
-            TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, getBrokers(), brokerConfigMap);
-            //If some succeed, and others fail, it will cause inconsistent data
-            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+            Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+            Assert.assertEquals(2, brokerConfigMapFromRemote.size());
+            for (Map.Entry<String, TopicConfigAndQueueMapping> entry: brokerConfigMapFromRemote.entrySet())  {
                 String broker = entry.getKey();
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
                 TopicConfigAndQueueMapping configMapping = entry.getValue();
-                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
+                TopicConfigAndQueueMapping configMappingLocal = brokerConfigMap.get(broker);
+                Assert.assertNotNull(configMappingLocal);
+                Assert.assertEquals(configMapping, configMappingLocal);
             }
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMapFromRemote);
+            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMapFromRemote.values())), false, true);
+            Assert.assertEquals(queueNum, globalIdMap.size());
         }
-        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
-
-        TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
-        Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
-        Assert.assertEquals(queueNum, globalIdMap.size());
+        /*{
+            Set<String> targetBrokers = Collections.singleton(broker1Name);
+            Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+            TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMapFromRemote, targetBrokers);
 
+        }*/
     }
 
+
     @After
     public void tearDown() {
         super.shutdown();