You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2021/12/08 11:30:59 UTC

[rocketmq] branch develop updated: fix(client): fetch and commit offset use master broker firstly

This is an automated email from the ASF dual-hosted git repository.

zhangyang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e2a4b37  fix(client): fetch and commit offset use master broker firstly
e2a4b37 is described below

commit e2a4b3778519ef761dac9ce4963eb20905fb35f4
Author: lushilin <lu...@bytedance.com>
AuthorDate: Wed Dec 8 15:58:42 2021 +0800

    fix(client): fetch and commit offset use master broker firstly
---
 .../consumer/store/RemoteBrokerOffsetStore.java    |  8 +++---
 .../client/impl/factory/MQClientInstance.java      | 30 ----------------------
 .../store/RemoteBrokerOffsetStoreTest.java         |  3 ++-
 3 files changed, 6 insertions(+), 35 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 6b76238..15b5bec 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
     @Override
     public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
         if (null == findBrokerResult) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
         }
 
         if (findBrokerResult != null) {
@@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
 
     private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
         if (null == findBrokerResult) {
 
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
         }
 
         if (findBrokerResult != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9651943..db81143 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -967,36 +967,6 @@ public class MQClientInstance {
         return this.consumerTable.get(group);
     }
 
-    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
-        String brokerAddr = null;
-        boolean slave = false;
-        boolean found = false;
-
-        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
-        if (map != null && !map.isEmpty()) {
-            for (Map.Entry<Long, String> entry : map.entrySet()) {
-                Long id = entry.getKey();
-                brokerAddr = entry.getValue();
-                if (brokerAddr != null) {
-                    found = true;
-                    if (MixAll.MASTER_ID == id) {
-                        slave = false;
-                    } else {
-                        slave = true;
-                    }
-                    break;
-
-                }
-            } // end of for
-        }
-
-        if (found) {
-            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
-        }
-
-        return null;
-    }
-
     public String findBrokerAddressInPublish(final String brokerName) {
         HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
         if (map != null && !map.isEmpty()) {
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index f762910..ec7a4cf 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
@@ -58,7 +59,7 @@ public class RemoteBrokerOffsetStoreTest {
         System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
         String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
         when(mQClientFactory.getClientId()).thenReturn(clientId);
-        when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
+        when(mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false)).thenReturn(new FindBrokerResult("127.0.0.1", false));
         when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
     }