You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/09 11:56:17 UTC
[rocketmq] 02/02: Finish the findBrokerAddr for admin publish
subscribe
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 08618d512ef1c3e711c65447571eb4ba7962a744
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 9 19:53:53 2021 +0800
Finish the findBrokerAddr for admin publish subscribe
---
.../consumer/store/RemoteBrokerOffsetStore.java | 8 +++---
.../impl/consumer/DefaultMQPullConsumerImpl.java | 6 ++++-
.../impl/consumer/DefaultMQPushConsumerImpl.java | 29 +++++++++++++++++-----
.../client/impl/consumer/PullAPIWrapper.java | 8 +++---
.../client/impl/consumer/RebalanceImpl.java | 7 +++---
.../impl/producer/DefaultMQProducerImpl.java | 6 ++---
6 files changed, 43 insertions(+), 21 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 6b76238..7364856 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (findBrokerResult != null) {
@@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (findBrokerResult != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index d04b040..89ce7db 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -578,7 +578,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
- String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+ String destBrokerName = brokerName;
+ if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(destBrokerName)) {
+ destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPullConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
+ }
+ String brokerAddr = (null != destBrokerName) ? this.mQClientFactory.findBrokerAddressInPublish(destBrokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
if (UtilAll.isBlank(consumerGroup)) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index dafa555..d00c4e8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -725,6 +725,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
desBrokerName = tmpBrokerName;
}
}
+ if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(desBrokerName)) {
+ desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
+ }
+
String brokerAddr = null;
if (null != desBrokerName) {
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
@@ -765,15 +769,21 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
String topic = message.getTopic();
+ String desBrokerName = brokerName;
+ if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) {
+ desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
+ }
+
+
FindBrokerResult
- findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
}
if (findBrokerResult == null) {
- log.error("The broker[" + brokerName + "] not exist");
+ log.error("The broker[" + desBrokerName + "] not exist");
return;
}
@@ -806,11 +816,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+
+ String desBrokerName = brokerName;
+ if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) {
+ desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
+ }
+
FindBrokerResult
- findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true);
}
if (findBrokerResult != null) {
ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
@@ -820,10 +836,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setExtraInfo(extraInfo);
requestHeader.setInvisibleTime(invisibleTime);
+ //here the broker should be polished
this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback);
return;
}
- throw new MQClientException("The broker[" + brokerName + "] not exist", null);
+ throw new MQClientException("The broker[" + desBrokerName + "] not exist", null);
}
public int getMaxReconsumeTimes() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index a379b1c..6d966a6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -239,12 +239,12 @@ public class PullAPIWrapper {
int queueId = mq.getQueueId();
FindBrokerResult findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+ this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+ this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
this.recalculatePullFromWhichNode(mq), false);
}
@@ -373,10 +373,10 @@ public class PullAPIWrapper {
public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup,
long timeout, PopCallback popCallback, boolean poll, int initMode, boolean order, String expressionType, String expression)
throws MQClientException, RemotingException, InterruptedException {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
}
if (findBrokerResult != null) {
PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index ab0d885..5788d1c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -76,7 +76,7 @@ public abstract class RebalanceImpl {
}
public void unlock(final MessageQueue mq, final boolean oneway) {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
@@ -141,7 +141,8 @@ public abstract class RebalanceImpl {
continue;
}
- Set<MessageQueue> mqs = result.get(mq.getBrokerName());
+ String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
+ Set<MessageQueue> mqs = result.get(destBrokerName);
if (null == mqs) {
mqs = new HashSet<MessageQueue>();
result.put(mq.getBrokerName(), mqs);
@@ -154,7 +155,7 @@ public abstract class RebalanceImpl {
}
public boolean lock(final MessageQueue mq) {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 52a2d9c..7719597 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -721,11 +721,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
- String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
+ String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
+ String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
+ brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
}
SendMessageContext context = null;