You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/23 13:16:36 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Catch the exception
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new fa39815 Catch the exception
fa39815 is described below
commit fa39815666187647b6990af7a8a98e1779eb065d
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 23 21:16:26 2021 +0800
Catch the exception
---
.../broker/processor/AdminBrokerProcessor.java | 3 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 16 +++--
.../apache/rocketmq/common/rpc/ClientMetadata.java | 4 ++
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 2 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 72 +++++++++++++++++-----
.../apache/rocketmq/tools/admin/MQAdminExt.java | 8 +--
6 files changed, 77 insertions(+), 28 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 3eceb49..0341851 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1880,7 +1880,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
if (topicConfig == null) {
log.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic());
- response.setCode(ResponseCode.SYSTEM_ERROR);
+ //be care of the response code, should set "not-exist" explictly
+ response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
return response;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 050fc30..b229283 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -332,7 +332,7 @@ public class MQClientAPIImpl {
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
@@ -2708,7 +2708,7 @@ public class MQClientAPIImpl {
public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic,
long timeoutMillis) throws InterruptedException,
- RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setWithMapping(true);
@@ -2720,15 +2720,19 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: {
return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class);
}
+ //should check the exist
+ case ResponseCode.TOPIC_NOT_EXIST: {
+ //should return null?
+ break;
+ }
default:
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQClientException(response.getCode(), response.getRemark());
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
- final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
@@ -2753,6 +2757,6 @@ public class MQClientAPIImpl {
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQClientException(response.getCode(), response.getRemark());
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 499b6a8..078f616 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -91,6 +91,10 @@ public class ClientMetadata {
return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID);
}
+ public ConcurrentMap<String, HashMap<Long, String>> getBrokerAddrTable() {
+ return brokerAddrTable;
+ }
+
public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
if (route.getTopicQueueMappingByBroker() == null
|| route.getTopicQueueMappingByBroker().isEmpty()) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index 9dd116d..bb21dc2 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -33,13 +33,13 @@ public class StaticTopicIT extends BaseConf {
defaultMQAdminExt = getAdmin(nsAddr);
waitBrokerRegistered(nsAddr, clusterName);
clientMetadata = new ClientMetadata();
+ defaultMQAdminExt.start();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
-
}
@Test
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 498e835..d805f8d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -211,8 +211,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException {
+ public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
@@ -258,7 +257,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
}
@@ -1171,20 +1170,63 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- TopicRouteData routeData = examineTopicRouteInfo(topic);
- clientMetadata.freshTopicRoute(topic, routeData);
+ public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
+ try {
+ TopicRouteData routeData = examineTopicRouteInfo(topic);
+ clientMetadata.freshTopicRoute(topic, routeData);
+ if (routeData != null
+ && !routeData.getQueueDatas().isEmpty()) {
+ for (QueueData queueData: routeData.getQueueDatas()) {
+ String bname = queueData.getBrokerName();
+ String addr = clientMetadata.findMasterBrokerAddr(bname);
+ try {
+ TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
+ //allow the config is null
+ if (mapping != null) {
+ brokerConfigMap.put(bname, mapping);
+ }
+ } catch (MQClientException exception) {
+ if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw exception;
+ }
+
+ }
+
+ }
+ }
+ } catch (MQClientException exception) {
+ if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw exception;
+ }
+ log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
+ //if cannot get from nameserver, then check all the brokers
+ try {
+ ClusterInfo clusterInfo = examineBrokerClusterInfo();
+ if (clusterInfo != null
+ && clusterInfo.getBrokerAddrTable() != null) {
+ clientMetadata.refreshClusterInfo(clusterInfo);
+ }
+ }catch (MQBrokerException e) {
+ throw new MQClientException(e.getResponseCode(), e.getMessage());
+ }
+ for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
+ String bname = entry.getKey();
+ HashMap<Long, String> map = entry.getValue();
+ String addr = map.get(MixAll.MASTER_ID);
+ if (addr != null) {
+ try {
+ TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
+ //allow the config is null
+ if (mapping != null) {
+ brokerConfigMap.put(bname, mapping);
+ }
+ } catch (MQClientException clientException) {
+ if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw clientException;
+ }
+ }
- if (routeData != null
- && !routeData.getQueueDatas().isEmpty()) {
- for (QueueData queueData: routeData.getQueueDatas()) {
- String bname = queueData.getBrokerName();
- String addr = clientMetadata.findMasterBrokerAddr(bname);
- TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
- //allow the config is null
- if (mapping != null) {
- brokerConfigMap.put(bname, mapping);
}
}
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index f01ccc3..60a366d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -109,7 +109,7 @@ public interface MQAdminExt extends MQAdmin {
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
TopicConfig examineTopicConfig(final String addr,
- final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+ final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
@@ -344,11 +344,9 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
- void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException;
- Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException;
void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;