You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/03/29 04:31:45 UTC
[rocketmq-mqtt] 17/43: fix the multi level topic special char
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
commit 531f7a91d1dd52d114099e25ff8fd713762556f3
Author: tianliuliu <64...@qq.com>
AuthorDate: Thu Mar 10 20:02:57 2022 +0800
fix the multi level topic special char
---
.../rocketmq/mqtt/common/facade/LmqQueueStore.java | 4 ---
.../rocketmq/mqtt/ds/notify/NotifyManager.java | 13 +++++-----
.../mqtt/ds/store/LmqOffsetStoreManager.java | 9 +++----
.../mqtt/ds/store/LmqQueueStoreManager.java | 29 +++++++++++-----------
.../mqtt/ds/test/TestLmqQueueStoreManager.java | 4 +--
.../rocketmq/mqtt/example/RocketMQProducer.java | 9 ++++---
6 files changed, 33 insertions(+), 35 deletions(-)
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
index 5da4d7d..b393abe 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
@@ -27,10 +27,6 @@ import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.StoreResult;
public interface LmqQueueStore {
- String LMQ_PREFIX = "%LMQ%";
- String PROPERTY_INNER_MULTI_DISPATCH = "INNER_MULTI_DISPATCH";
- String PROPERTY_INNER_MULTI_QUEUE_OFFSET = "INNER_MULTI_QUEUE_OFFSET";
- String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
/**
* put message and atomic dispatch to multiple queues
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index 029cbb8..69d7991 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -29,8 +29,8 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
import org.apache.rocketmq.mqtt.common.model.Constants;
import org.apache.rocketmq.mqtt.common.model.MessageEvent;
@@ -195,18 +195,19 @@ public class NotifyManager {
messageEvent.setPubTopic(message.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC));
return;
}
- if (StringUtils.isNotBlank(message.getUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH))) {
+ if (StringUtils.isNotBlank(message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
// maybe from rmq
- String s = message.getUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH);
- String[] lmqSet = s.split(LmqQueueStore.MULTI_DISPATCH_QUEUE_SPLITTER);
+ String s = message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String[] lmqSet = s.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
for (String lmq : lmqSet) {
if (TopicUtils.isWildCard(lmq)) {
continue;
}
- if (!lmq.contains(LmqQueueStore.LMQ_PREFIX)) {
+ if (!lmq.contains(MixAll.LMQ_PREFIX)) {
continue;
}
- messageEvent.setPubTopic(lmq.replace(LmqQueueStore.LMQ_PREFIX, ""));
+ String originQueue = lmq.replace(MixAll.LMQ_PREFIX, "");
+ messageEvent.setPubTopic(StringUtils.replace(originQueue, "%","/"));
}
}
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
index 3651945..fd8d718 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
@@ -26,7 +26,6 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
-import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.Subscription;
@@ -81,8 +80,8 @@ public class LmqOffsetStoreManager implements LmqOffsetStore {
String brokerAddress = tmpBrokerAddressMap.get(queue.getBrokerName());
QueueOffset queueOffset = each.getValue();
UpdateConsumerOffsetRequestHeader updateHeader = new UpdateConsumerOffsetRequestHeader();
- updateHeader.setTopic(queue.getQueueName());
- updateHeader.setConsumerGroup(LmqQueueStore.LMQ_PREFIX + clientId);
+ updateHeader.setTopic(StringUtils.replace(queue.getQueueName(), "/","%"));
+ updateHeader.setConsumerGroup(MixAll.LMQ_PREFIX + clientId);
updateHeader.setQueueId((int) queue.getQueueId());
updateHeader.setCommitOffset(queueOffset.getOffset());
defaultMQPullConsumer
@@ -112,8 +111,8 @@ public class LmqOffsetStoreManager implements LmqOffsetStore {
map.put(queue, queueOffset);
try {
QueryConsumerOffsetRequestHeader queryHeader = new QueryConsumerOffsetRequestHeader();
- queryHeader.setTopic(queue.getQueueName());
- queryHeader.setConsumerGroup(LmqQueueStore.LMQ_PREFIX + clientId);
+ queryHeader.setTopic(StringUtils.replace(queue.getQueueName(), "/","%"));
+ queryHeader.setConsumerGroup(MixAll.LMQ_PREFIX + clientId);
queryHeader.setQueueId((int) queue.getQueueId());
long offset = defaultMQPullConsumer
.getDefaultMQPullConsumerImpl()
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index c7a2b00..88b42aa 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -126,15 +126,16 @@ public class LmqQueueStoreManager implements LmqQueueStore {
message.setOffset(parseLmqOffset(queue, mqMessage));
if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC))) {
message.setOriginTopic(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC));
- } else if (StringUtils.isNotBlank(message.getUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH))) {
+ } else if (StringUtils.isNotBlank(message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
// maybe from rmq
- String s = message.getUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH);
- String[] lmqSet = s.split(LmqQueueStore.MULTI_DISPATCH_QUEUE_SPLITTER);
+ String s = message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String[] lmqSet = s.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
for (String lmq : lmqSet) {
if (TopicUtils.isWildCard(lmq)) {
continue;
}
- message.setOriginTopic(lmq.replace(LmqQueueStore.LMQ_PREFIX, ""));
+ String originQueue = lmq.replace(MixAll.LMQ_PREFIX, "");
+ message.setOriginTopic(StringUtils.replace(originQueue, "%","/"));
}
}
message.setFirstTopic(mqMessage.getTopic());
@@ -154,18 +155,18 @@ public class LmqQueueStoreManager implements LmqQueueStore {
}
private long parseLmqOffset(Queue queue, MessageExt mqMessage) {
- String multiDispatchQueue = mqMessage.getProperty(PROPERTY_INNER_MULTI_DISPATCH);
+ String multiDispatchQueue = mqMessage.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
if (StringUtils.isBlank(multiDispatchQueue)) {
return mqMessage.getQueueOffset();
}
- String multiQueueOffset = mqMessage.getProperty(PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+ String multiQueueOffset = mqMessage.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
if (StringUtils.isBlank(multiQueueOffset)) {
return mqMessage.getQueueOffset();
}
- String[] queues = multiDispatchQueue.split(MULTI_DISPATCH_QUEUE_SPLITTER);
- String[] queueOffsets = multiQueueOffset.split(MULTI_DISPATCH_QUEUE_SPLITTER);
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
for (int i = 0; i < queues.length; i++) {
- if ((LMQ_PREFIX + queue.getQueueName()).equals(queues[i])) {
+ if ((MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%")).equals(queues[i])) {
return Long.parseLong(queueOffsets[i]);
}
}
@@ -177,10 +178,10 @@ public class LmqQueueStoreManager implements LmqQueueStore {
CompletableFuture<StoreResult> result = new CompletableFuture<>();
org.apache.rocketmq.common.message.Message mqMessage = toMQMessage(message);
mqMessage.setTags(Constants.MQTT_TAG);
- mqMessage.putUserProperty(PROPERTY_INNER_MULTI_DISPATCH,
+ mqMessage.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
StringUtils.join(
- queues.stream().map(s -> LMQ_PREFIX + s).collect(Collectors.toSet()),
- MULTI_DISPATCH_QUEUE_SPLITTER));
+ queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
+ MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
try {
long start = System.currentTimeMillis();
defaultMQProducer.send(mqMessage,
@@ -210,7 +211,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
try {
MessageQueue messageQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId());
long start = System.currentTimeMillis();
- String lmqTopic = LMQ_PREFIX + queue.getQueueName();
+ String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
pull(lmqTopic, messageQueue, queueOffset.getOffset(), (int) count, new PullCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
@@ -409,7 +410,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
}
private long maxOffset(Queue queue) throws MQClientException {
- String lmqTopic = LMQ_PREFIX + queue.getQueueName();
+ String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
MQClientInstance mQClientFactory = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
String brokerAddr = mQClientFactory.findBrokerAddressInPublish(queue.getBrokerName());
if (null == brokerAddr) {
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
index 01bc106..b43ce6e 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
@@ -50,7 +51,6 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import static org.apache.rocketmq.mqtt.common.facade.LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@@ -92,7 +92,7 @@ public class TestLmqQueueStoreManager {
lmqQueueStoreManager.putMessage(queues, message);
ArgumentCaptor<org.apache.rocketmq.common.message.Message> argumentCaptor = ArgumentCaptor.forClass(org.apache.rocketmq.common.message.Message.class);
verify(defaultMQProducer).send(argumentCaptor.capture(), any(SendCallback.class));
- Assert.assertTrue(null != argumentCaptor.getValue().getUserProperty(PROPERTY_INNER_MULTI_DISPATCH));
+ Assert.assertTrue(null != argumentCaptor.getValue().getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH));
}
@Test
diff --git a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/RocketMQProducer.java b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/RocketMQProducer.java
index 9c80609..a35d884 100644
--- a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/RocketMQProducer.java
+++ b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/RocketMQProducer.java
@@ -22,8 +22,9 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -66,10 +67,10 @@ public class RocketMQProducer {
}
private static void setLmq(Message msg, Set<String> queues) {
- msg.putUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH,
+ msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
StringUtils.join(
- queues.stream().map(s -> LmqQueueStore.LMQ_PREFIX + s).collect(Collectors.toSet()),
- LmqQueueStore.MULTI_DISPATCH_QUEUE_SPLITTER));
+ queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
+ MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
}
private static void sendMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {