You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 12:01:37 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Refactor the admin code, reduce exposing apis
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 2dd6594 Refactor the admin code, reduce exposing apis
2dd6594 is described below
commit 2dd659494475d2212609fc24b872a10f422b9180
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 20:01:13 2021 +0800
Refactor the admin code, reduce exposing apis
---
.../common/statictopic/TopicQueueMappingUtils.java | 9 +-
.../statictopic/TopicQueueMappingUtilsTest.java | 10 +-
.../util/{MQAdmin.java => MQAdminTestUtils.java} | 28 ++-
.../org/apache/rocketmq/test/base/BaseConf.java | 6 +-
.../rocketmq/test/base/IntegrationTestBase.java | 4 +-
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 87 ++++---
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 9 -
.../tools/admin/DefaultMQAdminExtImpl.java | 133 -----------
.../apache/rocketmq/tools/admin/MQAdminExt.java | 3 -
.../apache/rocketmq/tools/admin/MQAdminUtils.java | 254 +++++++++++++++++++++
.../topic/RemappingStaticTopicSubCommand.java | 24 +-
.../command/topic/UpdateStaticTopicSubCommand.java | 43 +---
12 files changed, 344 insertions(+), 266 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 975a5ba..e56d585 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -422,9 +422,8 @@ public class TopicQueueMappingUtils {
}
}
- public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Set<String> nonTargetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+ public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
- checkNonTargetBrokers(targetBrokers, nonTargetBrokers);
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
@@ -484,12 +483,6 @@ public class TopicQueueMappingUtils {
TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
}
- //set the non target brokers
- for (String broker : nonTargetBrokers) {
- if (!brokerConfigMap.containsKey(broker)) {
- brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
- }
- }
// set the topic config
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
index 93cad48..f5cd0ef 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
@@ -128,9 +128,9 @@ public class TopicQueueMappingUtilsTest {
Set<String> targetBrokers = buildTargetBrokers(2 * i);
Set<String> nonTargetBrokers = buildTargetBrokers(2 * i, "test");
queueNum = 10 * i;
- TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, nonTargetBrokers, brokerConfigMap);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
- Assert.assertEquals(4 * i, brokerConfigMap.size());
+ Assert.assertEquals(2 * i, brokerConfigMap.size());
//do the check manually
Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
@@ -168,7 +168,7 @@ public class TopicQueueMappingUtilsTest {
int queueNum = 7;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> originalBrokers = buildTargetBrokers(2);
- TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
@@ -213,7 +213,7 @@ public class TopicQueueMappingUtilsTest {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> originalBrokers = buildTargetBrokers(2);
{
- TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
}
@@ -234,7 +234,7 @@ public class TopicQueueMappingUtilsTest {
int queueNum = 10;
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
Set<String> targetBrokers = buildTargetBrokers(2);
- TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<String>(), brokerConfigMap);
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size());
TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
similarity index 76%
rename from test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
rename to test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 8c0caa6..9de6205 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.test.util;
import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
@@ -25,12 +26,16 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.CommandUtil;
-public class MQAdmin {
- private static Logger log = Logger.getLogger(MQAdmin.class);
+public class MQAdminTestUtils {
+ private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
int queueNum) {
@@ -163,4 +168,23 @@ public class MQAdmin {
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
}
+ //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
+ public static Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ assert brokerConfigMap.isEmpty();
+ TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+ MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
+ MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false);
+ return brokerConfigMap;
+ }
+
+ //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
+ public static void remappingStaticTopic(String topic, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ assert !brokerConfigMap.isEmpty();
+ TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
+ MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
+ MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
+ }
+
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 53a7ab3..ddeee6e 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -31,8 +31,6 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion;
@@ -47,7 +45,7 @@ import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
-import org.apache.rocketmq.test.util.MQAdmin;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
@@ -122,7 +120,7 @@ public class BaseConf {
}
public static String initConsumerGroup(String group) {
- MQAdmin.createSub(nsAddr, clusterName, group);
+ MQAdminTestUtils.createSub(nsAddr, clusterName, group);
return group;
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 50dc8fc..06f079c 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -34,7 +34,7 @@ import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.test.util.MQAdmin;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.TestUtils;
public class IntegrationTestBase {
@@ -166,7 +166,7 @@ public class IntegrationTestBase {
boolean createResult;
while (true) {
- createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, queueNumbers);
+ createResult = MQAdminTestUtils.createTopic(nsAddr, clusterName, topic, queueNumbers);
if (createResult) {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index c1cc60b..aeddef6 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -1,10 +1,8 @@
package org.apache.rocketmq.test.smoke;
import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageClientExt;
-import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
@@ -12,20 +10,20 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
-import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess;
import java.util.ArrayList;
import java.util.Collection;
@@ -45,50 +43,44 @@ public class StaticTopicIT extends BaseConf {
private static Logger logger = Logger.getLogger(StaticTopicIT.class);
private DefaultMQAdminExt defaultMQAdminExt;
- private ClientMetadata clientMetadata;
@Before
public void setUp() throws Exception {
System.setProperty("rocketmq.client.rebalance.waitInterval", "500");
defaultMQAdminExt = getAdmin(nsAddr);
waitBrokerRegistered(nsAddr, clusterName);
- clientMetadata = new ClientMetadata();
defaultMQAdminExt.start();
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- if (clusterInfo == null
- || clusterInfo.getClusterAddrTable().isEmpty()) {
- throw new RuntimeException("The Cluster info is empty");
- }
- clientMetadata.refreshClusterInfo(clusterInfo);
}
- public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception {
- Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
- Assert.assertTrue(brokerConfigMap.isEmpty());
- TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
- Assert.assertEquals(targetBrokers.size(), brokerConfigMap.size());
- //If some succeed, and others fail, it will cause inconsistent data
- for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
- String broker = entry.getKey();
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = entry.getValue();
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
+ @Test
+ public void testNoTargetBrokers() throws Exception {
+ String topic = "static" + MQRandomUtils.getRandomTopic();
+ int queueNum = 10;
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker1Name);
+ MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+ Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ Assert.assertEquals(2, remoteBrokerConfigMap.size());
+ TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+ Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+ Assert.assertEquals(queueNum, globalIdMap.size());
+ TopicConfigAndQueueMapping configMapping = remoteBrokerConfigMap.get(broker2Name);
+ Assert.assertEquals(0, configMapping.getWriteQueueNums());
+ Assert.assertEquals(0, configMapping.getReadQueueNums());
+ Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size());
}
- return brokerConfigMap;
- }
-
- public void remappingStaticTopic(String topic, Set<String> targetBrokers) throws Exception {
- Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
- Assert.assertFalse(brokerConfigMap.isEmpty());
- TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
- defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false);
- }
-
-
-
- @Test
- public void testNonTargetBrokers() {
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker2Name);
+ MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+ Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ Assert.assertEquals(2, remoteBrokerConfigMap.size());
+ TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+ Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+ Assert.assertEquals(queueNum, globalIdMap.size());
+ }
}
@@ -102,10 +94,10 @@ public class StaticTopicIT extends BaseConf {
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
- Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers());
+ Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
//check the static topic config
{
- Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(2, remoteBrokerConfigMap.size());
for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) {
String broker = entry.getKey();
@@ -178,7 +170,7 @@ public class StaticTopicIT extends BaseConf {
}
@Test
- public void testDoubleReadCheck() throws Exception {
+ public void testDoubleReadCheckConsumerOffset() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
String group = initConsumerGroup();
RMQNormalProducer producer = getProducer(nsAddr, topic);
@@ -194,7 +186,7 @@ public class StaticTopicIT extends BaseConf {
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name);
- createStaticTopic(topic, queueNum, targetBrokers);
+ MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
}
//produce the messages
{
@@ -217,7 +209,7 @@ public class StaticTopicIT extends BaseConf {
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
- remappingStaticTopic(topic, targetBrokers);
+ MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
}
//make the metadata
@@ -226,8 +218,8 @@ public class StaticTopicIT extends BaseConf {
{
producer = getProducer(nsAddr, topic);
+ ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
//just refresh the metadata
- defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
@@ -276,7 +268,7 @@ public class StaticTopicIT extends BaseConf {
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name);
- createStaticTopic(topic, queueNum, targetBrokers);
+ MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
}
//System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
//System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
@@ -310,8 +302,8 @@ public class StaticTopicIT extends BaseConf {
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
- remappingStaticTopic(topic, targetBrokers);
- Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+ Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
@@ -324,6 +316,7 @@ public class StaticTopicIT extends BaseConf {
Thread.sleep(500);
producer.setDebug(true);
{
+ ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index a36948c..5c80e86 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -220,11 +220,6 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException {
- return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic);
- }
-
- @Override
public TopicStatsTable examineTopicStats(
String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
@@ -621,9 +616,5 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
- @Override
- public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.defaultMQAdminExtImpl.remappingStaticTopic(clientMetadata, topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, blockSeqSize, force);
- }
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 07c4bf3..7b0450c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1056,139 +1056,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override
- public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- for (String broker : brokerConfigMap.keySet()) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- if (addr == null) {
- throw new RuntimeException("Can't find addr for broker " + broker);
- }
- }
- // now do the remapping
- //Step1: let the new leader can be write without the logicOffset
- for (String broker: brokersToMapIn) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
- createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
- }
- //Step2: forbid the write of old leader
- for (String broker: brokersToMapOut) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
- createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
- }
- //Step3: decide the logic offset
- for (String broker: brokersToMapOut) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicStatsTable statsTable = examineTopicStats(addr, topic);
- TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
- for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
- List<LogicQueueMappingItem> items = entry.getValue();
- Integer globalId = entry.getKey();
- if (items.size() < 2) {
- continue;
- }
- LogicQueueMappingItem newLeader = items.get(items.size() - 1);
- LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
- if (newLeader.getLogicOffset() > 0) {
- continue;
- }
- TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
- if (topicOffset == null) {
- throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
- }
- //TODO check the max offset, will it return -1?
- if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
- throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
- }
- newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
- TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
- //fresh the new leader
- TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
- }
- }
- //Step4: write to the new leader with logic offset
- for (String broker: brokersToMapIn) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
- createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
- }
- //Step5: write the non-target brokers
- for (String broker: brokerConfigMap.keySet()) {
- if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
- continue;
- }
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
- createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
- }
- }
-
- @Override
- public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException {
- Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
- boolean getFromBrokers = false;
- TopicRouteData routeData = null;
- try {
- routeData = examineTopicRouteInfo(topic);
- } catch (MQClientException exception) {
- if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
- throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
- } else {
- getFromBrokers = true;
- }
- }
- if (!getFromBrokers) {
- if (routeData != null
- && !routeData.getQueueDatas().isEmpty()) {
- clientMetadata.freshTopicRoute(topic, routeData);
- for (QueueData queueData: routeData.getQueueDatas()) {
- String bname = queueData.getBrokerName();
- String addr = clientMetadata.findMasterBrokerAddr(bname);
- try {
- TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
- //allow the config is null
- if (mapping != null) {
- brokerConfigMap.put(bname, mapping);
- }
- } catch (MQBrokerException exception) {
- if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
- throw exception;
- }
- }
-
- }
- }
- } else {
- log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
- //if cannot get from nameserver, then check all the brokers
- ClusterInfo clusterInfo = examineBrokerClusterInfo();
- if (clusterInfo != null
- && clusterInfo.getBrokerAddrTable() != null) {
- clientMetadata.refreshClusterInfo(clusterInfo);
- }
- for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
- String bname = entry.getKey();
- HashMap<Long, String> map = entry.getValue();
- String addr = map.get(MixAll.MASTER_ID);
- if (addr != null) {
- try {
- TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
- //allow the config is null
- if (mapping != null) {
- brokerConfigMap.put(bname, mapping);
- }
- } catch (MQBrokerException exception1) {
- if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
- throw exception1;
- }
- }
- }
- }
- }
- return brokerConfigMap;
- }
-
- @Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 6c27443..c4838e3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -320,7 +320,4 @@ public interface MQAdminExt extends MQAdmin {
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException;
- Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException;
-
- void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
new file mode 100644
index 0000000..288bac4
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -0,0 +1,254 @@
+package org.apache.rocketmq.tools.admin;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MQAdminUtils {
+
+
+ public static ClientMetadata getBrokerMetadata(DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+ ClientMetadata clientMetadata = new ClientMetadata();
+ refreshClusterInfo(defaultMQAdminExt, clientMetadata);
+ return clientMetadata;
+ }
+
+ public static ClientMetadata getBrokerAndTopicMetadata(String topic, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingException, MQBrokerException {
+ ClientMetadata clientMetadata = new ClientMetadata();
+ refreshClusterInfo(defaultMQAdminExt, clientMetadata);
+ refreshTopicRouteInfo(topic, defaultMQAdminExt, clientMetadata);
+ return clientMetadata;
+ }
+
+ public static void refreshClusterInfo(DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ if (clusterInfo == null
+ || clusterInfo.getClusterAddrTable().isEmpty()) {
+ throw new RuntimeException("The Cluster info is empty");
+ }
+ clientMetadata.refreshClusterInfo(clusterInfo);
+ }
+
+ public static void refreshTopicRouteInfo(String topic, DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) throws RemotingException, InterruptedException, MQBrokerException {
+ TopicRouteData routeData = null;
+ try {
+ routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ } catch (MQClientException exception) {
+ if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
+ }
+ }
+ if (routeData != null
+ && !routeData.getQueueDatas().isEmpty()) {
+ clientMetadata.freshTopicRoute(topic, routeData);
+ }
+ }
+
+ public static Set<String> getAllBrokersInSameCluster(Collection<String> brokers, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ if (clusterInfo == null
+ || clusterInfo.getClusterAddrTable().isEmpty()) {
+ throw new RuntimeException("The Cluster info is empty");
+ }
+ Set<String> allBrokers = new HashSet<>();
+ for (String broker: brokers) {
+ if (allBrokers.contains(broker)) {
+ continue;
+ }
+ for (Set<String> clusterBrokers : clusterInfo.getClusterAddrTable().values()) {
+ if (clusterBrokers.contains(broker)) {
+ allBrokers.addAll(clusterBrokers);
+ break;
+ }
+ }
+ }
+ return allBrokers;
+ }
+
+ public static void completeNoTargetBrokers(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
+ String topic = configMapping.getTopicName();
+ int queueNum = configMapping.getMappingDetail().getTotalQueues();
+ long newEpoch = configMapping.getMappingDetail().getEpoch();
+ Set<String> allBrokers = getAllBrokersInSameCluster(brokerConfigMap.keySet(), defaultMQAdminExt);
+ for (String broker: allBrokers) {
+ if (!brokerConfigMap.containsKey(broker)) {
+ brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
+ }
+ }
+ }
+
+ public static void checkIfMasterAlive(Collection<String> brokers, DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) {
+ for (String broker : brokers) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ if (addr == null) {
+ throw new RuntimeException("Can't find addr for broker " + broker);
+ }
+ }
+ }
+
+ public static void updateTopicConfigMappingAll(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
+ ClientMetadata clientMetadata = getBrokerMetadata(defaultMQAdminExt);
+ checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
+ //If some succeed, and others fail, it will cause inconsistent data
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+ String broker = entry.getKey();
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+ }
+ }
+
+ public static void remappingStaticTopic(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+ ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt);
+ MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
+ // now do the remapping
+ //Step1: let the new leader can be write without the logicOffset
+ for (String broker: brokersToMapIn) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+ }
+ //Step2: forbid the write of old leader
+ for (String broker: brokersToMapOut) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+ }
+ //Step3: decide the logic offset
+ for (String broker: brokersToMapOut) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
+ TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
+ for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+ List<LogicQueueMappingItem> items = entry.getValue();
+ Integer globalId = entry.getKey();
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem newLeader = items.get(items.size() - 1);
+ LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
+ if (newLeader.getLogicOffset() > 0) {
+ continue;
+ }
+ TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
+ if (topicOffset == null) {
+ throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
+ }
+ //TODO check the max offset, will it return -1?
+ if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
+ throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
+ }
+ newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
+ TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
+ //fresh the new leader
+ TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
+ }
+ }
+ //Step4: write to the new leader with logic offset
+ for (String broker: brokersToMapIn) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+ }
+ //Step5: write the non-target brokers
+ for (String broker: brokerConfigMap.keySet()) {
+ if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
+ continue;
+ }
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+ }
+ }
+
+ public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, InterruptedException, MQBrokerException {
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
+ ClientMetadata clientMetadata = new ClientMetadata();
+ boolean getFromBrokers = false;
+ TopicRouteData routeData = null;
+ try {
+ routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ } catch (MQClientException exception) {
+ if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
+ } else {
+ getFromBrokers = true;
+ }
+ }
+ if (!getFromBrokers) {
+ if (routeData != null
+ && !routeData.getQueueDatas().isEmpty()) {
+ clientMetadata.freshTopicRoute(topic, routeData);
+ for (QueueData queueData: routeData.getQueueDatas()) {
+ String bname = queueData.getBrokerName();
+ String addr = clientMetadata.findMasterBrokerAddr(bname);
+ try {
+ TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+ //allow the config is null
+ if (mapping != null) {
+ brokerConfigMap.put(bname, mapping);
+ }
+ } catch (MQBrokerException exception) {
+ if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw exception;
+ }
+ }
+
+ }
+ }
+ } else {
+ //if cannot get from nameserver, then check all the brokers
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ if (clusterInfo != null
+ && clusterInfo.getBrokerAddrTable() != null) {
+ clientMetadata.refreshClusterInfo(clusterInfo);
+ }
+ for (Map.Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
+ String bname = entry.getKey();
+ HashMap<Long, String> map = entry.getValue();
+ String addr = map.get(MixAll.MASTER_ID);
+ if (addr != null) {
+ try {
+ TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+ //allow the config is null
+ if (mapping != null) {
+ brokerConfigMap.put(bname, mapping);
+ }
+ } catch (MQBrokerException exception1) {
+ if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw exception1;
+ }
+ }
+ }
+ }
+ }
+ return brokerConfigMap;
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index b516f1c..9a6f15a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -20,6 +20,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
@@ -30,6 +31,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
@@ -87,7 +89,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
- ClientMetadata clientMetadata = new ClientMetadata();
try {
String topic = commandLine.getOptionValue('t').trim();
@@ -99,24 +100,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- if (clusterInfo == null
- || clusterInfo.getClusterAddrTable().isEmpty()) {
- throw new RuntimeException("The Cluster info is empty");
- }
- clientMetadata.refreshClusterInfo(clusterInfo);
boolean force = false;
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true;
}
- for (String broker : wrapper.getBrokerConfigMap().keySet()) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- if (addr == null) {
- throw new RuntimeException("Can't find addr for broker " + broker);
- }
- }
- defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force);
+ MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force, defaultMQAdminExt);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -159,7 +148,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
- clientMetadata.refreshClusterInfo(clusterInfo);
{
if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim();
@@ -186,7 +174,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
}
}
- brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
if (brokerConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping");
}
@@ -204,7 +192,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
System.out.println("The old mapping data is written to file " + newMappingDataFile);
}
- defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false);
+ MQAdminUtils.completeNoTargetBrokers(newWrapper.getBrokerConfigMap(), defaultMQAdminExt);
+
+ MQAdminUtils.remappingStaticTopic(topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false, defaultMQAdminExt);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index a32dc8d..958922e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
@@ -106,14 +107,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- if (clusterInfo == null
- || clusterInfo.getClusterAddrTable().isEmpty()) {
- throw new RuntimeException("The Cluster info is empty");
- }
- clientMetadata.refreshClusterInfo(clusterInfo);
-
- doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
+ MQAdminUtils.completeNoTargetBrokers(wrapper.getBrokerConfigMap(), defaultMQAdminExt);
+ MQAdminUtils.updateTopicConfigMappingAll(wrapper.getBrokerConfigMap(), defaultMQAdminExt, false);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -122,22 +117,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
}
- public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
- //check it before
- for (String broker : brokerConfigMap.keySet()) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- if (addr == null) {
- throw new RuntimeException("Can't find addr for broker " + broker);
- }
- }
- //If some succeed, and others fail, it will cause inconsistent data
- for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
- String broker = entry.getKey();
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- TopicConfigAndQueueMapping configMapping = entry.getValue();
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
- }
- }
@Override
@@ -155,7 +134,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
- ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Set<String> targetBrokers = new HashSet<>();
@@ -173,7 +151,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
- clientMetadata.refreshClusterInfo(clusterInfo);
{
if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim();
@@ -192,17 +169,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (targetBrokers.isEmpty()) {
throw new RuntimeException("Find none brokers, do nothing");
}
- for (String broker : targetBrokers) {
- String addr = clientMetadata.findMasterBrokerAddr(broker);
- if (addr == null) {
- throw new RuntimeException("Can't find addr for broker " + broker);
- }
- }
}
//get the existed topic config and mapping
- brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
@@ -219,15 +190,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
targetBrokers.addAll(brokerConfigMap.keySet());
//calculate the new data
- TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
+ TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
{
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}
- doUpdate(newWrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
-
+ MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
+ MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {