You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/09 11:56:17 UTC

[rocketmq] 02/02: Finish the findBrokerAddr for admin publish subscribe

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 08618d512ef1c3e711c65447571eb4ba7962a744
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 9 19:53:53 2021 +0800

    Finish the findBrokerAddr for admin publish subscribe
---
 .../consumer/store/RemoteBrokerOffsetStore.java    |  8 +++---
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |  6 ++++-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 29 +++++++++++++++++-----
 .../client/impl/consumer/PullAPIWrapper.java       |  8 +++---
 .../client/impl/consumer/RebalanceImpl.java        |  7 +++---
 .../impl/producer/DefaultMQProducerImpl.java       |  6 ++---
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 6b76238..7364856 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
     @Override
     public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         if (null == findBrokerResult) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         }
 
         if (findBrokerResult != null) {
@@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
 
     private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         if (null == findBrokerResult) {
 
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         }
 
         if (findBrokerResult != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index d04b040..89ce7db 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -578,7 +578,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
-            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+            String destBrokerName = brokerName;
+            if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(destBrokerName)) {
+                destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPullConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
+            }
+            String brokerAddr = (null != destBrokerName) ? this.mQClientFactory.findBrokerAddressInPublish(destBrokerName)
                 : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
 
             if (UtilAll.isBlank(consumerGroup)) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index dafa555..d00c4e8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -725,6 +725,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     desBrokerName = tmpBrokerName;
                 }
             }
+            if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(desBrokerName)) {
+                desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
+            }
+
             String brokerAddr = null;
             if (null != desBrokerName) {
                 brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
@@ -765,15 +769,21 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
             String topic = message.getTopic();
 
+            String desBrokerName = brokerName;
+            if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) {
+                desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
+            }
+
+
             FindBrokerResult
-                findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+                findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
             if (null == findBrokerResult) {
                 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-                findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+                findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
             }
 
             if (findBrokerResult == null) {
-                log.error("The broker[" + brokerName + "] not exist");
+                log.error("The broker[" + desBrokerName + "] not exist");
                 return;
             }
 
@@ -806,11 +816,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
         String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
         int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+
+        String desBrokerName = brokerName;
+        if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) {
+            desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
+        }
+
         FindBrokerResult
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
         if (null == findBrokerResult) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
         }
         if (findBrokerResult != null) {
             ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
@@ -820,10 +836,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             requestHeader.setConsumerGroup(consumerGroup);
             requestHeader.setExtraInfo(extraInfo);
             requestHeader.setInvisibleTime(invisibleTime);
+            //here the broker should be polished
             this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback);
             return;
         }
-        throw new MQClientException("The broker[" + brokerName + "] not exist", null);
+        throw new MQClientException("The broker[" + desBrokerName + "] not exist", null);
     }
 
     public int getMaxReconsumeTimes() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index a379b1c..6d966a6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -239,12 +239,12 @@ public class PullAPIWrapper {
         int queueId = mq.getQueueId();
 
         FindBrokerResult findBrokerResult =
-            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+            this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
                 this.recalculatePullFromWhichNode(mq), false);
         if (null == findBrokerResult) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
             findBrokerResult =
-                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+                this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
                     this.recalculatePullFromWhichNode(mq), false);
         }
 
@@ -373,10 +373,10 @@ public class PullAPIWrapper {
     public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup,
                          long timeout, PopCallback popCallback, boolean poll, int initMode, boolean order, String expressionType, String expression)
         throws MQClientException, RemotingException, InterruptedException {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
         if (null == findBrokerResult) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
         }
         if (findBrokerResult != null) {
             PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index ab0d885..5788d1c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -76,7 +76,7 @@ public abstract class RebalanceImpl {
     }
 
     public void unlock(final MessageQueue mq, final boolean oneway) {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
         if (findBrokerResult != null) {
             UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
             requestBody.setConsumerGroup(this.consumerGroup);
@@ -141,7 +141,8 @@ public abstract class RebalanceImpl {
                 continue;
             }
 
-            Set<MessageQueue> mqs = result.get(mq.getBrokerName());
+            String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
+            Set<MessageQueue> mqs = result.get(destBrokerName);
             if (null == mqs) {
                 mqs = new HashSet<MessageQueue>();
                 result.put(mq.getBrokerName(), mqs);
@@ -154,7 +155,7 @@ public abstract class RebalanceImpl {
     }
 
     public boolean lock(final MessageQueue mq) {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
         if (findBrokerResult != null) {
             LockBatchRequestBody requestBody = new LockBatchRequestBody();
             requestBody.setConsumerGroup(this.consumerGroup);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 52a2d9c..7719597 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -721,11 +721,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         final TopicPublishInfo topicPublishInfo,
         final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         long beginStartTime = System.currentTimeMillis();
-        String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
+        String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
         if (null == brokerAddr) {
             tryToFindTopicPublishInfo(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
         }
 
         SendMessageContext context = null;