You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2021/12/21 06:17:35 UTC
[rocketmq] branch develop updated: [ISSUE 3613] bug fix, solve message hash conflict in index file #3616
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0bc011c [ISSUE 3613] bug fix, solve message hash conflict in index file #3616
0bc011c is described below
commit 0bc011cef3ba5ff6c1a5af9fe89109f4f0ab414d
Author: xijiu <42...@qq.com>
AuthorDate: Tue Dec 21 14:12:31 2021 +0800
[ISSUE 3613] bug fix, solve message hash conflict in index file #3616
---
.../apache/rocketmq/client/impl/MQAdminImpl.java | 5 +-
.../org/apache/rocketmq/test/base/BaseConf.java | 7 ++-
.../client/producer/querymsg/QueryMsgByKeyIT.java | 57 ++++++++++++++++++++++
3 files changed, 66 insertions(+), 3 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 8884e4a..ba4eafa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
@@ -406,12 +407,14 @@ public class MQAdminImpl {
}
} else {
String keys = msgExt.getKeys();
+ String msgTopic = msgExt.getTopic();
if (keys != null) {
boolean matched = false;
String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
if (keyArray != null) {
for (String k : keyArray) {
- if (key.equals(k)) {
+ // both topic and key must be equal at the same time
+ if (Objects.equals(key, k) && Objects.equals(topic, msgTopic)) {
matched = true;
break;
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index c6a835f..0f1c4bf 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -70,9 +70,12 @@ public class BaseConf {
public static String initTopic() {
String topic = MQRandomUtils.getRandomTopic();
- IntegrationTestBase.initTopic(topic, nsAddr, clusterName);
+ return initTopicWithName(topic);
+ }
- return topic;
+ public static String initTopicWithName(String topicName) {
+ IntegrationTestBase.initTopic(topicName, nsAddr, clusterName);
+ return topicName;
}
public static String initConsumerGroup() {
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
index 827d4f9..d7c4364 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
@@ -18,7 +18,9 @@
package org.apache.rocketmq.test.client.producer.querymsg;
import java.util.List;
+
import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -101,4 +103,59 @@ public class QueryMsgByKeyIT extends BaseConf {
assertThat(queryMsgs).isNotNull();
assertThat(queryMsgs.size()).isEqualTo(max);
}
+
+
+ @Test(expected = MQClientException.class)
+ public void testQueryMsgWithSameHash1() throws Exception {
+ int msgSize = 1;
+ String topicA = "AaTopic";
+ String keyA = "Aa";
+ String topicB = "BBTopic";
+ String keyB = "BB";
+
+ initTopicWithName(topicA);
+ initTopicWithName(topicB);
+
+ RMQNormalProducer producerA = getProducer(nsAddr, topicA);
+ RMQNormalProducer producerB = getProducer(nsAddr, topicB);
+
+ List<Object> msgA = MQMessageFactory.getKeyMsg(topicA, keyA, msgSize);
+ List<Object> msgB = MQMessageFactory.getKeyMsg(topicB, keyB, msgSize);
+
+ producerA.send(msgA);
+ producerB.send(msgB);
+
+ long begin = System.currentTimeMillis() - 500000;
+ long end = System.currentTimeMillis() + 500000;
+ producerA.getProducer().queryMessage(topicA, keyB, msgSize * 10, begin, end).getMessageList();
+ }
+
+
+ @Test
+ public void testQueryMsgWithSameHash2() throws Exception {
+ int msgSize = 1;
+ String topicA = "AaAaTopic";
+ String keyA = "Aa";
+ String topicB = "BBBBTopic";
+ String keyB = "Aa";
+
+ initTopicWithName(topicA);
+ initTopicWithName(topicB);
+
+ RMQNormalProducer producerA = getProducer(nsAddr, topicA);
+ RMQNormalProducer producerB = getProducer(nsAddr, topicB);
+
+ List<Object> msgA = MQMessageFactory.getKeyMsg(topicA, keyA, msgSize);
+ List<Object> msgB = MQMessageFactory.getKeyMsg(topicB, keyB, msgSize);
+
+ producerA.send(msgA);
+ producerB.send(msgB);
+
+ long begin = System.currentTimeMillis() - 500000;
+ long end = System.currentTimeMillis() + 500000;
+ List<MessageExt> list = producerA.getProducer().queryMessage(topicA, keyA, msgSize * 10, begin, end).getMessageList();
+
+ assertThat(list).isNotNull();
+ assertThat(list.size()).isEqualTo(1);
+ }
}