You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/09 11:56:16 UTC
[rocketmq] 01/02: Convert mq to broker name
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f308cd308ec0b767c5e36320e6bbdb99c526e677
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 9 16:41:10 2021 +0800
Convert mq to broker name
---
.../apache/rocketmq/client/impl/MQAdminImpl.java | 17 ++++++-------
.../impl/consumer/DefaultMQPushConsumerImpl.java | 13 +++++++---
.../client/impl/factory/MQClientInstance.java | 29 ++++++----------------
.../impl/producer/DefaultMQProducerImpl.java | 20 ++++-----------
4 files changed, 29 insertions(+), 50 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 04492e3..ce4c94a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -41,7 +41,6 @@ import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.InvokeCallback;
@@ -191,10 +190,10 @@ public class MQAdminImpl {
if (logicalQueueRouteData != null) {
mq = logicalQueueRouteData.getMessageQueue();
}
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (brokerAddr != null) {
@@ -227,10 +226,10 @@ public class MQAdminImpl {
previousQueueRouteData = maxQueueRouteData;
mq = maxQueueRouteData.getMessageQueue();
}
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (brokerAddr != null) {
@@ -256,10 +255,10 @@ public class MQAdminImpl {
mq = minQueueRouteData.getMessageQueue();
}
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (brokerAddr != null) {
@@ -298,10 +297,10 @@ public class MQAdminImpl {
mq = minQueueRouteData.getMessageQueue();
}
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
}
if (brokerAddr != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index d236646..dafa555 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -718,11 +718,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
+ String desBrokerName = brokerName;
+ if (mq != null) {
+ String tmpBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
+ if (tmpBrokerName != null) {
+ desBrokerName = tmpBrokerName;
+ }
+ }
String brokerAddr = null;
- if (null != mq) {
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
- } else if (null != brokerName) {
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
+ if (null != desBrokerName) {
+ brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
} else {
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index a7d521f..950eae7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1055,16 +1055,11 @@ public class MQClientInstance {
return mq.getBrokerName();
}
- public FindBrokerResult findBrokerAddressInAdmin(final MessageQueue mq) {
- String brokerName = getBrokerNameFromMessageQueue(mq);
+
+ public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
if (brokerName == null) {
return null;
- } else {
- return findBrokerAddressInAdmin(brokerName);
}
- }
-
- public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
@@ -1094,16 +1089,11 @@ public class MQClientInstance {
return null;
}
- public String findBrokerAddressInPublish(final MessageQueue mq) {
- String brokerName = getBrokerNameFromMessageQueue(mq);
+
+ public String findBrokerAddressInPublish(final String brokerName) {
if (brokerName == null) {
return null;
- } else {
- return findBrokerAddressInPublish(brokerName);
}
- }
- //This is used for retry only
- public String findBrokerAddressInPublish(final String brokerName) {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
return map.get(MixAll.MASTER_ID);
@@ -1112,20 +1102,15 @@ public class MQClientInstance {
return null;
}
- public FindBrokerResult findBrokerAddressInSubscribe(final MessageQueue mq, final long brokerId, final boolean onlyThisBroker) {
- String brokerName = getBrokerNameFromMessageQueue(mq);
- if (brokerName == null) {
- return null;
- } else {
- return findBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker);
- }
- }
public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName,
final long brokerId,
final boolean onlyThisBroker
) {
+ if (brokerName == null) {
+ return null;
+ }
String brokerAddr = null;
boolean slave = false;
boolean found = false;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 578c843..52a2d9c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,16 +16,12 @@
*/
package org.apache.rocketmq.client.impl.producer;
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Objects;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -45,7 +41,6 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.exception.MQRedirectException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
@@ -67,7 +62,6 @@ import org.apache.rocketmq.client.producer.RequestFutureTable;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendResultForLogicalQueue;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
@@ -88,13 +82,9 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.utils.CorrelationIdUtil;
import org.apache.rocketmq.logging.InternalLogger;
@@ -731,14 +721,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
+ String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq);
+ brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
- String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
-
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
@@ -1342,7 +1331,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
- final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue());
+ final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue()));
+ final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());