You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/09 11:56:16 UTC

[rocketmq] 01/02: Convert mq to broker name

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f308cd308ec0b767c5e36320e6bbdb99c526e677
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 9 16:41:10 2021 +0800

    Convert mq to broker name
---
 .../apache/rocketmq/client/impl/MQAdminImpl.java   | 17 ++++++-------
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 13 +++++++---
 .../client/impl/factory/MQClientInstance.java      | 29 ++++++----------------
 .../impl/producer/DefaultMQProducerImpl.java       | 20 ++++-----------
 4 files changed, 29 insertions(+), 50 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 04492e3..ce4c94a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -41,7 +41,6 @@ import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.InvokeCallback;
@@ -191,10 +190,10 @@ public class MQAdminImpl {
         if (logicalQueueRouteData != null) {
             mq = logicalQueueRouteData.getMessageQueue();
         }
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         if (null == brokerAddr) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         }
 
         if (brokerAddr != null) {
@@ -227,10 +226,10 @@ public class MQAdminImpl {
                 previousQueueRouteData = maxQueueRouteData;
                 mq = maxQueueRouteData.getMessageQueue();
             }
-            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
             if (null == brokerAddr) {
                 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
             }
 
             if (brokerAddr != null) {
@@ -256,10 +255,10 @@ public class MQAdminImpl {
             mq = minQueueRouteData.getMessageQueue();
         }
 
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         if (null == brokerAddr) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         }
 
         if (brokerAddr != null) {
@@ -298,10 +297,10 @@ public class MQAdminImpl {
             mq = minQueueRouteData.getMessageQueue();
         }
 
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         if (null == brokerAddr) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         }
 
         if (brokerAddr != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index d236646..dafa555 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -718,11 +718,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
+            String desBrokerName = brokerName;
+            if (mq != null) {
+                String tmpBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
+                if (tmpBrokerName != null) {
+                    desBrokerName = tmpBrokerName;
+                }
+            }
             String brokerAddr = null;
-            if (null != mq) {
-                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
-            } else if (null != brokerName) {
-                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
+            if (null != desBrokerName) {
+                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
             } else {
                 RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
             }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index a7d521f..950eae7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1055,16 +1055,11 @@ public class MQClientInstance {
         return mq.getBrokerName();
     }
 
-    public FindBrokerResult findBrokerAddressInAdmin(final MessageQueue mq) {
-        String brokerName = getBrokerNameFromMessageQueue(mq);
+
+    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
         if (brokerName == null) {
             return null;
-        } else {
-            return findBrokerAddressInAdmin(brokerName);
         }
-    }
-
-    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
         String brokerAddr = null;
         boolean slave = false;
         boolean found = false;
@@ -1094,16 +1089,11 @@ public class MQClientInstance {
         return null;
     }
 
-    public String findBrokerAddressInPublish(final MessageQueue mq) {
-        String brokerName = getBrokerNameFromMessageQueue(mq);
+
+    public String findBrokerAddressInPublish(final String brokerName) {
         if (brokerName == null) {
             return null;
-        } else {
-            return findBrokerAddressInPublish(brokerName);
         }
-    }
-    //This is used for retry only
-    public String findBrokerAddressInPublish(final String brokerName) {
         HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
         if (map != null && !map.isEmpty()) {
             return map.get(MixAll.MASTER_ID);
@@ -1112,20 +1102,15 @@ public class MQClientInstance {
         return null;
     }
 
-    public FindBrokerResult findBrokerAddressInSubscribe(final MessageQueue mq, final long brokerId, final boolean onlyThisBroker) {
-        String brokerName = getBrokerNameFromMessageQueue(mq);
-        if (brokerName == null) {
-            return null;
-        } else {
-            return findBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker);
-        }
-    }
 
     public FindBrokerResult findBrokerAddressInSubscribe(
         final String brokerName,
         final long brokerId,
         final boolean onlyThisBroker
     ) {
+        if (brokerName == null) {
+            return null;
+        }
         String brokerAddr = null;
         boolean slave = false;
         boolean found = false;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 578c843..52a2d9c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,16 +16,12 @@
  */
 package org.apache.rocketmq.client.impl.producer;
 
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Objects;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -45,7 +41,6 @@ import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.common.ClientErrorCode;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.exception.MQRedirectException;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.client.hook.CheckForbiddenContext;
 import org.apache.rocketmq.client.hook.CheckForbiddenHook;
@@ -67,7 +62,6 @@ import org.apache.rocketmq.client.producer.RequestFutureTable;
 import org.apache.rocketmq.client.producer.RequestResponseFuture;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendResultForLogicalQueue;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.producer.TransactionCheckListener;
 import org.apache.rocketmq.client.producer.TransactionListener;
@@ -88,13 +82,9 @@ import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageType;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.utils.CorrelationIdUtil;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -731,14 +721,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         final TopicPublishInfo topicPublishInfo,
         final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         long beginStartTime = System.currentTimeMillis();
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+        String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
         if (null == brokerAddr) {
             tryToFindTopicPublishInfo(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
         }
 
-        String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
-
         SendMessageContext context = null;
         if (brokerAddr != null) {
             brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
@@ -1342,7 +1331,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
         }
         String transactionId = sendResult.getTransactionId();
-        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue());
+        final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue()));
+        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
         EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
         requestHeader.setTransactionId(transactionId);
         requestHeader.setCommitLogOffset(id.getOffset());