You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/24 09:31:19 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the test for createStaticTopic
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new cb7e032 Finish the test for createStaticTopic
cb7e032 is described below
commit cb7e032fe38453eeb3dc966b029b15103f56b4fa
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 24 17:30:35 2021 +0800
Finish the test for createStaticTopic
---
.../common/statictopic/LogicQueueMappingItem.java | 37 ++++++++++++++++
.../statictopic/TopicConfigAndQueueMapping.java | 24 +++++++++++
.../statictopic/TopicQueueMappingDetail.java | 23 ++++++++++
.../common/statictopic/TopicQueueMappingInfo.java | 30 +++++++++++++
.../common/statictopic/TopicQueueMappingTest.java | 2 +
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 49 +++++++++++++++-------
6 files changed, 150 insertions(+), 15 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 16f41e5..959207e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -1,5 +1,7 @@
package org.apache.rocketmq.common.statictopic;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class LogicQueueMappingItem extends RemotingSerializable {
@@ -135,6 +137,41 @@ public class LogicQueueMappingItem extends RemotingSerializable {
this.startOffset = startOffset;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+
+ if (!(o instanceof LogicQueueMappingItem)) return false;
+
+ LogicQueueMappingItem item = (LogicQueueMappingItem) o;
+
+ return new EqualsBuilder()
+ .append(gen, item.gen)
+ .append(queueId, item.queueId)
+ .append(logicOffset, item.logicOffset)
+ .append(startOffset, item.startOffset)
+ .append(endOffset, item.endOffset)
+ .append(timeOfStart, item.timeOfStart)
+ .append(timeOfEnd, item.timeOfEnd)
+ .append(bname, item.bname)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(gen)
+ .append(queueId)
+ .append(bname)
+ .append(logicOffset)
+ .append(startOffset)
+ .append(endOffset)
+ .append(timeOfStart)
+ .append(timeOfEnd)
+ .toHashCode();
+ }
+
@Override
public String toString() {
return "LogicQueueMappingItem{" +
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java
index af891d0..cef6418 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicConfigAndQueueMapping.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.common.statictopic;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.rocketmq.common.TopicConfig;
public class TopicConfigAndQueueMapping extends TopicConfig {
@@ -36,4 +38,26 @@ public class TopicConfigAndQueueMapping extends TopicConfig {
public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
this.mappingDetail = mappingDetail;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+
+ if (!(o instanceof TopicConfigAndQueueMapping)) return false;
+
+ TopicConfigAndQueueMapping that = (TopicConfigAndQueueMapping) o;
+
+ return new EqualsBuilder()
+ .appendSuper(super.equals(o))
+ .append(mappingDetail, that.mappingDetail)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .appendSuper(super.hashCode())
+ .append(mappingDetail)
+ .toHashCode();
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 4a8bae3..30db209 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.common.statictopic;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -124,4 +127,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public void setHostedQueues(ConcurrentMap<Integer, List<LogicQueueMappingItem>> hostedQueues) {
this.hostedQueues = hostedQueues;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+
+ if (!(o instanceof TopicQueueMappingDetail)) return false;
+
+ TopicQueueMappingDetail that = (TopicQueueMappingDetail) o;
+
+ return new EqualsBuilder()
+ .append(hostedQueues, that.hostedQueues)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(hostedQueues)
+ .toHashCode();
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index 7636fd5..53041aa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.common.statictopic;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.concurrent.ConcurrentHashMap;
@@ -93,5 +95,33 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
this.currIdMap = currIdMap;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof TopicQueueMappingInfo)) return false;
+
+ TopicQueueMappingInfo info = (TopicQueueMappingInfo) o;
+
+ return new EqualsBuilder()
+ .append(totalQueues, info.totalQueues)
+ .append(epoch, info.epoch)
+ .append(dirty, info.dirty)
+ .append(topic, info.topic)
+ .append(bname, info.bname)
+ .append(currIdMap, info.currIdMap)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(topic)
+ .append(totalQueues)
+ .append(bname)
+ .append(epoch)
+ .append(dirty)
+ .append(currIdMap)
+ .toHashCode();
+ }
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index d571f65..9b9ab6b 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -31,6 +31,7 @@ public class TopicQueueMappingTest {
//test the decode encode
{
LogicQueueMappingItem mappingItemFromJson = RemotingSerializable.fromJson(mappingItemJson, LogicQueueMappingItem.class);
+ Assert.assertEquals(mappingItem, mappingItemFromJson);
Assert.assertEquals(mappingItemJson, RemotingSerializable.toJson(mappingItemFromJson, false));
}
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
@@ -48,6 +49,7 @@ public class TopicQueueMappingTest {
TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().size());
Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().get(0).size());
+ Assert.assertEquals(mappingItem, mappingDetailFromJson.getHostedQueues().get(0).get(0));
Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false));
}
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index bb21dc2..73215ba 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -16,6 +17,8 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -42,32 +45,48 @@ public class StaticTopicIT extends BaseConf {
clientMetadata.refreshClusterInfo(clusterInfo);
}
+ public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception {
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ Assert.assertTrue(brokerConfigMap.isEmpty());
+ TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+ //If some succeed, and others fail, it will cause inconsistent data
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+ String broker = entry.getKey();
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
+ }
+ return brokerConfigMap;
+ }
+
@Test
- public void testCreateStaticTopic() throws Exception {
+ public void testCreateAndRemappingStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
int queueNum = 10;
- Set<String> brokers = getBrokers();
- //create topic
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = createStaticTopic(topic, queueNum, getBrokers());
{
- Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
- Assert.assertTrue(brokerConfigMap.isEmpty());
- TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, getBrokers(), brokerConfigMap);
- //If some succeed, and others fail, it will cause inconsistent data
- for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ Assert.assertEquals(2, brokerConfigMapFromRemote.size());
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry: brokerConfigMapFromRemote.entrySet()) {
String broker = entry.getKey();
- String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
+ TopicConfigAndQueueMapping configMappingLocal = brokerConfigMap.get(broker);
+ Assert.assertNotNull(configMappingLocal);
+ Assert.assertEquals(configMapping, configMappingLocal);
}
+ TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMapFromRemote);
+ Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMapFromRemote.values())), false, true);
+ Assert.assertEquals(queueNum, globalIdMap.size());
}
- Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
-
- TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
- Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
- Assert.assertEquals(queueNum, globalIdMap.size());
+ /*{
+ Set<String> targetBrokers = Collections.singleton(broker1Name);
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMapFromRemote, targetBrokers);
+ }*/
}
+
@After
public void tearDown() {
super.shutdown();