You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/23 13:16:36 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Catch the exception

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new fa39815  Catch the exception
fa39815 is described below

commit fa39815666187647b6990af7a8a98e1779eb065d
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 23 21:16:26 2021 +0800

    Catch the exception
---
 .../broker/processor/AdminBrokerProcessor.java     |  3 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 16 +++--
 .../apache/rocketmq/common/rpc/ClientMetadata.java |  4 ++
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  |  2 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         | 72 +++++++++++++++++-----
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  8 +--
 6 files changed, 77 insertions(+), 28 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 3eceb49..0341851 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1880,7 +1880,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
         if (topicConfig == null) {
             log.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic());
-            response.setCode(ResponseCode.SYSTEM_ERROR);
+            //be care of the response code, should set "not-exist" explictly
+            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
             response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
             return response;
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 050fc30..b229283 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -332,7 +332,7 @@ public class MQClientAPIImpl {
 
     public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
         final long timeoutMillis)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, InterruptedException, MQClientException {
         CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
         requestHeader.setTopic(topicConfig.getTopicName());
         requestHeader.setDefaultTopic(defaultTopic);
@@ -2708,7 +2708,7 @@ public class MQClientAPIImpl {
 
     public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic,
         long timeoutMillis) throws InterruptedException,
-        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
         GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
         header.setTopic(topic);
         header.setWithMapping(true);
@@ -2720,15 +2720,19 @@ public class MQClientAPIImpl {
             case ResponseCode.SUCCESS: {
                 return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class);
             }
+            //should check the exist
+            case ResponseCode.TOPIC_NOT_EXIST: {
+                //should return null?
+                break;
+            }
             default:
                 break;
         }
-        throw new MQBrokerException(response.getCode(), response.getRemark());
+        throw new MQClientException(response.getCode(), response.getRemark());
     }
 
     public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
-                            final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+                            final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
         CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
         requestHeader.setTopic(topicConfig.getTopicName());
         requestHeader.setDefaultTopic(defaultTopic);
@@ -2753,6 +2757,6 @@ public class MQClientAPIImpl {
                 break;
         }
 
-        throw new MQBrokerException(response.getCode(), response.getRemark());
+        throw new MQClientException(response.getCode(), response.getRemark());
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 499b6a8..078f616 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -91,6 +91,10 @@ public class ClientMetadata {
         return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID);
     }
 
+    public ConcurrentMap<String, HashMap<Long, String>> getBrokerAddrTable() {
+        return brokerAddrTable;
+    }
+
     public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
         if (route.getTopicQueueMappingByBroker() == null
                 || route.getTopicQueueMappingByBroker().isEmpty()) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index 9dd116d..bb21dc2 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -33,13 +33,13 @@ public class StaticTopicIT extends BaseConf {
         defaultMQAdminExt = getAdmin(nsAddr);
         waitBrokerRegistered(nsAddr, clusterName);
         clientMetadata = new ClientMetadata();
+        defaultMQAdminExt.start();
         ClusterInfo clusterInfo  = defaultMQAdminExt.examineBrokerClusterInfo();
         if (clusterInfo == null
                 || clusterInfo.getClusterAddrTable().isEmpty()) {
             throw new RuntimeException("The Cluster info is empty");
         }
         clientMetadata.refreshClusterInfo(clusterInfo);
-
     }
 
     @Test
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 498e835..d805f8d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -211,8 +211,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
-        InterruptedException, MQClientException {
+    public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
     }
 
@@ -258,7 +257,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+    public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
     }
 
@@ -1171,20 +1170,63 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        TopicRouteData routeData = examineTopicRouteInfo(topic);
-        clientMetadata.freshTopicRoute(topic, routeData);
+    public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
+        try {
+            TopicRouteData routeData = examineTopicRouteInfo(topic);
+            clientMetadata.freshTopicRoute(topic, routeData);
+            if (routeData != null
+                    && !routeData.getQueueDatas().isEmpty()) {
+                for (QueueData queueData: routeData.getQueueDatas()) {
+                    String bname = queueData.getBrokerName();
+                    String addr = clientMetadata.findMasterBrokerAddr(bname);
+                    try {
+                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
+                        //allow the config is null
+                        if (mapping != null) {
+                            brokerConfigMap.put(bname, mapping);
+                        }
+                    } catch (MQClientException exception) {
+                        if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                            throw exception;
+                        }
+
+                    }
+
+                }
+            }
+        } catch (MQClientException  exception) {
+            if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                throw exception;
+            }
+            log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
+            //if cannot get from nameserver, then check all the brokers
+            try {
+                ClusterInfo clusterInfo = examineBrokerClusterInfo();
+                if (clusterInfo != null
+                        && clusterInfo.getBrokerAddrTable() != null) {
+                    clientMetadata.refreshClusterInfo(clusterInfo);
+                }
+            }catch (MQBrokerException e) {
+                throw new MQClientException(e.getResponseCode(), e.getMessage());
+            }
+            for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
+                String bname = entry.getKey();
+                HashMap<Long, String> map = entry.getValue();
+                String addr = map.get(MixAll.MASTER_ID);
+                if (addr != null) {
+                    try {
+                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
+                        //allow the config is null
+                        if (mapping != null) {
+                            brokerConfigMap.put(bname, mapping);
+                        }
+                    }  catch (MQClientException clientException) {
+                        if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                            throw clientException;
+                        }
+                    }
 
-        if (routeData != null
-                && !routeData.getQueueDatas().isEmpty()) {
-            for (QueueData queueData: routeData.getQueueDatas()) {
-                String bname = queueData.getBrokerName();
-                String addr = clientMetadata.findMasterBrokerAddr(bname);
-                TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
-                //allow the config is null
-                if (mapping != null) {
-                    brokerConfigMap.put(bname, mapping);
                 }
             }
         }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index f01ccc3..60a366d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -109,7 +109,7 @@ public interface MQAdminExt extends MQAdmin {
     SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
 
     TopicConfig examineTopicConfig(final String addr,
-        final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+        final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 
     TopicStatsTable examineTopicStats(
         final String topic) throws RemotingException, MQClientException, InterruptedException,
@@ -344,11 +344,9 @@ public interface MQAdminExt extends MQAdmin {
         LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 
 
-    void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
-            InterruptedException, MQClientException;
+    void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException;
 
-    Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException,
-            InterruptedException, MQClientException;
+    Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException;
 
     void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;