You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2021/12/08 11:30:59 UTC
[rocketmq] branch develop updated: fix(client): fetch and commit offset use master broker firstly
This is an automated email from the ASF dual-hosted git repository.
zhangyang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e2a4b37 fix(client): fetch and commit offset use master broker firstly
e2a4b37 is described below
commit e2a4b3778519ef761dac9ce4963eb20905fb35f4
Author: lushilin <lu...@bytedance.com>
AuthorDate: Wed Dec 8 15:58:42 2021 +0800
fix(client): fetch and commit offset use master broker firstly
---
.../consumer/store/RemoteBrokerOffsetStore.java | 8 +++---
.../client/impl/factory/MQClientInstance.java | 30 ----------------------
.../store/RemoteBrokerOffsetStoreTest.java | 3 ++-
3 files changed, 6 insertions(+), 35 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 6b76238..15b5bec 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
}
if (findBrokerResult != null) {
@@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
}
if (findBrokerResult != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9651943..db81143 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -967,36 +967,6 @@ public class MQClientInstance {
return this.consumerTable.get(group);
}
- public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
- String brokerAddr = null;
- boolean slave = false;
- boolean found = false;
-
- HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
- if (map != null && !map.isEmpty()) {
- for (Map.Entry<Long, String> entry : map.entrySet()) {
- Long id = entry.getKey();
- brokerAddr = entry.getValue();
- if (brokerAddr != null) {
- found = true;
- if (MixAll.MASTER_ID == id) {
- slave = false;
- } else {
- slave = true;
- }
- break;
-
- }
- } // end of for
- }
-
- if (found) {
- return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
- }
-
- return null;
- }
-
public String findBrokerAddressInPublish(final String brokerName) {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index f762910..ec7a4cf 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
@@ -58,7 +59,7 @@ public class RemoteBrokerOffsetStoreTest {
System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
when(mQClientFactory.getClientId()).thenReturn(clientId);
- when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
+ when(mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false)).thenReturn(new FindBrokerResult("127.0.0.1", false));
when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
}