You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/24 06:40:12 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the json problem
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 6c64028 Polish the json problem
6c64028 is described below
commit 6c6402868984356136335e437abcadb6c08c95d9
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 24 14:39:52 2021 +0800
Polish the json problem
---
.../broker/processor/AdminBrokerProcessor.java | 7 ++-
.../apache/rocketmq/client/impl/MQAdminImpl.java | 17 -------
.../rocketmq/client/impl/MQClientAPIImpl.java | 8 ++--
.../protocol/body/TopicQueueMappingBody.java | 51 --------------------
.../common/statictopic/LogicQueueMappingItem.java | 12 +++++
.../statictopic/TopicQueueMappingDetail.java | 11 +++++
.../common/statictopic/TopicQueueMappingInfo.java | 4 ++
.../common/statictopic/TopicQueueMappingTest.java | 55 +++++++++++++---------
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 1 +
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 6 +--
.../tools/admin/DefaultMQAdminExtImpl.java | 52 ++++++++++----------
.../apache/rocketmq/tools/admin/MQAdminExt.java | 9 ++--
12 files changed, 105 insertions(+), 128 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0341851..443ae46 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -313,7 +313,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- final TopicQueueMappingBody topicQueueMappingBody = RemotingSerializable.decode(request.getBody(), TopicQueueMappingBody.class);
+ final TopicQueueMappingDetail topicQueueMappingDetail = RemotingSerializable.decode(request.getBody(), TopicQueueMappingDetail.class);
String topic = requestHeader.getTopic();
@@ -338,7 +338,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(), force);
+ System.out.println("Broker body:" + new String(request.getBody()));
+ System.out.println("Broker bodetaildy:" + topicQueueMappingDetail.toJson());
+
+ this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force);
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 7d539cd..2946588 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -80,23 +80,6 @@ public class MQAdminImpl {
this.timeoutMillis = timeoutMillis;
}
- public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, boolean force) throws MQClientException {
- MQClientException exception = null;
- for (int i = 0; i < 3; i++) {
- try {
- this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
- break;
- } catch (Exception e) {
- if (2 == i) {
- exception = new MQClientException("create topic to broker exception", e);
- }
- }
- }
- if (exception != null) {
- throw exception;
- }
- }
-
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b229283..d57532c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -2708,7 +2708,7 @@ public class MQClientAPIImpl {
public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic,
long timeoutMillis) throws InterruptedException,
- RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setWithMapping(true);
@@ -2728,11 +2728,11 @@ public class MQClientAPIImpl {
default:
break;
}
- throw new MQClientException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
- final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
+ final long timeoutMillis) throws RemotingException, InterruptedException, MQBrokerException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
@@ -2757,6 +2757,6 @@ public class MQClientAPIImpl {
break;
}
- throw new MQClientException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
deleted file mode 100644
index 7e8918f..0000000
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.common.protocol.body;
-
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
-public class TopicQueueMappingBody extends RemotingSerializable {
-
- private boolean force;
- private int prevGen;
- private TopicQueueMappingDetail mappingDetail;
-
- public int getPrevGen() {
- return prevGen;
- }
-
- public void setPrevGen(int prevGen) {
- this.prevGen = prevGen;
- }
-
- public TopicQueueMappingDetail getMappingDetail() {
- return mappingDetail;
- }
-
- public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
- this.mappingDetail = mappingDetail;
- }
-
- public boolean isForce() {
- return force;
- }
-
- public void setForce(boolean force) {
- this.force = force;
- }
-}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 479f75d..b87d2f1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -100,6 +100,18 @@ public class LogicQueueMappingItem {
this.logicOffset = logicOffset;
}
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
+
+ public void setTimeOfStart(long timeOfStart) {
+ this.timeOfStart = timeOfStart;
+ }
+
+ public void setTimeOfEnd(long timeOfEnd) {
+ this.timeOfEnd = timeOfEnd;
+ }
+
@Override
public String toString() {
return "LogicQueueMappingItem{" +
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 7117cad..b80aa9d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -29,6 +29,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+
+
+ public TopicQueueMappingDetail() {
+
+ }
+
+
public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) {
super(topic, totalQueues, bname, epoch);
buildIdMap();
@@ -120,6 +127,10 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return hostedQueues;
}
+ public void setHostedQueues(ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> hostedQueues) {
+ this.hostedQueues = hostedQueues;
+ }
+
public boolean checkIfAsPhysical(Integer globalId) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
return mappingItems == null
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index f6122c0..39747b3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -32,6 +32,10 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
//register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+ public TopicQueueMappingInfo() {
+
+ }
+
public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) {
this.topic = topic;
this.totalQueues = totalQueues;
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index a1b3d27..b0cc5dd 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -4,8 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
-import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Test;
@@ -20,33 +19,45 @@ public class TopicQueueMappingTest {
System.out.println(File.separator);
}
-
@Test
public void testJsonSerialize() {
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
String mappingItemJson = JSON.toJSONString(mappingItem) ;
- System.out.println(mappingItemJson);
-
- Map<String, Object> mappingItemMap = JSON.parseObject(mappingItemJson, Map.class);
- Assert.assertEquals(8, mappingItemMap.size());
- Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname());
- Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen());
- Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset());
- Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId());
- Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset());
- Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset());
- Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
- Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
-
+ {
+ Map<String, Object> mappingItemMap = JSON.parseObject(mappingItemJson, Map.class);
+ Assert.assertEquals(8, mappingItemMap.size());
+ Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname());
+ Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen());
+ Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset());
+ Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId());
+ Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset());
+ Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset());
+ Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
+ Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
+ }
+ {
+ String mappingItemJson2 = RemotingSerializable.toJson(RemotingSerializable.decode(mappingItemJson.getBytes(), LogicQueueMappingItem.class), false);
+ Assert.assertEquals(mappingItemJson, mappingItemJson2);
+ }
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
String mappingDetailJson = JSON.toJSONString(mappingDetail);
- Map mappingDetailMap = JSON.parseObject(mappingDetailJson);
- Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap"));
- Assert.assertFalse(mappingDetailMap.containsKey("currIdMap"));
- Assert.assertEquals(4, mappingDetailMap.size());
- Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
- Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
+ {
+ Map mappingDetailMap = JSON.parseObject(mappingDetailJson);
+ Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap"));
+ Assert.assertFalse(mappingDetailMap.containsKey("currIdMap"));
+ Assert.assertEquals(6, mappingDetailMap.size());
+ Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
+ Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
+ }
+ {
+ System.out.println(mappingDetailJson);
+ TopicQueueMappingDetail detailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
+ System.out.println(JSON.toJSONString(detailFromJson));
+
+ //Assert.assertEquals(1, detailFromJson.getHostedQueues().size());
+ //Assert.assertEquals(1, detailFromJson.getHostedQueues().get("0").size());
+ }
}
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index bb21dc2..dac33d0 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -57,6 +57,7 @@ public class StaticTopicIT extends BaseConf {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
+ System.out.println(configMapping.getMappingDetail().toJson());
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 1913612..0c93a7a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -218,12 +218,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
@Override
public TopicConfig examineTopicConfig(String addr,
- String topic) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
+ String topic) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, InterruptedException, MQBrokerException {
return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
}
@Override
- public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic);
}
@@ -668,7 +668,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, InterruptedException, MQBrokerException {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index d805f8d..3c491cd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -211,7 +211,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException {
+ public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
@@ -257,7 +258,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
}
@@ -1105,8 +1106,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws MQClientException {
- this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
+ public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException {
+ this.mqClientInstance.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
}
@@ -1170,13 +1171,23 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException {
+ public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
+ boolean getFromBrokers = false;
+ TopicRouteData routeData = null;
try {
- TopicRouteData routeData = examineTopicRouteInfo(topic);
- clientMetadata.freshTopicRoute(topic, routeData);
+ routeData = examineTopicRouteInfo(topic);
+ } catch (MQClientException exception) {
+ if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
+ } else {
+ getFromBrokers = true;
+ }
+ }
+ if (!getFromBrokers) {
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
+ clientMetadata.freshTopicRoute(topic, routeData);
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
@@ -1186,29 +1197,21 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
- } catch (MQClientException exception) {
+ } catch (MQBrokerException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
-
}
}
}
- } catch (MQClientException exception) {
- if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
- throw exception;
- }
+ } else {
log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
//if cannot get from nameserver, then check all the brokers
- try {
- ClusterInfo clusterInfo = examineBrokerClusterInfo();
- if (clusterInfo != null
- && clusterInfo.getBrokerAddrTable() != null) {
- clientMetadata.refreshClusterInfo(clusterInfo);
- }
- }catch (MQBrokerException e) {
- throw new MQClientException(e.getResponseCode(), e.getMessage());
+ ClusterInfo clusterInfo = examineBrokerClusterInfo();
+ if (clusterInfo != null
+ && clusterInfo.getBrokerAddrTable() != null) {
+ clientMetadata.refreshClusterInfo(clusterInfo);
}
for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
String bname = entry.getKey();
@@ -1221,12 +1224,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
- } catch (MQClientException clientException) {
- if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
- throw clientException;
+ } catch (MQBrokerException exception1) {
+ if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw exception1;
}
}
-
}
}
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 60a366d..6414a9b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -108,8 +108,6 @@ public interface MQAdminExt extends MQAdmin {
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
- TopicConfig examineTopicConfig(final String addr,
- final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
@@ -344,9 +342,12 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
- void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException;
+ TopicConfig examineTopicConfig(final String addr,
+ final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+ void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException;
- Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException;
+ Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException;
void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;