You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2021/12/21 06:17:35 UTC

[rocketmq] branch develop updated: [ISSUE 3613] bug fix, solve message hash conflict in index file #3616

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0bc011c  [ISSUE 3613] bug fix, solve message hash conflict in index file #3616
0bc011c is described below

commit 0bc011cef3ba5ff6c1a5af9fe89109f4f0ab414d
Author: xijiu <42...@qq.com>
AuthorDate: Tue Dec 21 14:12:31 2021 +0800

    [ISSUE 3613] bug fix, solve message hash conflict in index file #3616
---
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |  5 +-
 .../org/apache/rocketmq/test/base/BaseConf.java    |  7 ++-
 .../client/producer/querymsg/QueryMsgByKeyIT.java  | 57 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 3 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 8884e4a..ba4eafa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -406,12 +407,14 @@ public class MQAdminImpl {
                             }
                         } else {
                             String keys = msgExt.getKeys();
+                            String msgTopic = msgExt.getTopic();
                             if (keys != null) {
                                 boolean matched = false;
                                 String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
                                 if (keyArray != null) {
                                     for (String k : keyArray) {
-                                        if (key.equals(k)) {
+                                        // both topic and key must be equal at the same time
+                                        if (Objects.equals(key, k) && Objects.equals(topic, msgTopic)) {
                                             matched = true;
                                             break;
                                         }
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index c6a835f..0f1c4bf 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -70,9 +70,12 @@ public class BaseConf {
 
     public static String initTopic() {
         String topic = MQRandomUtils.getRandomTopic();
-        IntegrationTestBase.initTopic(topic, nsAddr, clusterName);
+        return initTopicWithName(topic);
+    }
 
-        return topic;
+    public static String initTopicWithName(String topicName) {
+        IntegrationTestBase.initTopic(topicName, nsAddr, clusterName);
+        return topicName;
     }
 
     public static String initConsumerGroup() {
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
index 827d4f9..d7c4364 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
@@ -18,7 +18,9 @@
 package org.apache.rocketmq.test.client.producer.querymsg;
 
 import java.util.List;
+
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -101,4 +103,59 @@ public class QueryMsgByKeyIT extends BaseConf {
         assertThat(queryMsgs).isNotNull();
         assertThat(queryMsgs.size()).isEqualTo(max);
     }
+
+
+    @Test(expected = MQClientException.class)
+    public void testQueryMsgWithSameHash1() throws Exception {
+        int msgSize = 1;
+        String topicA = "AaTopic";
+        String keyA = "Aa";
+        String topicB = "BBTopic";
+        String keyB = "BB";
+
+        initTopicWithName(topicA);
+        initTopicWithName(topicB);
+
+        RMQNormalProducer producerA = getProducer(nsAddr, topicA);
+        RMQNormalProducer producerB = getProducer(nsAddr, topicB);
+
+        List<Object> msgA = MQMessageFactory.getKeyMsg(topicA, keyA, msgSize);
+        List<Object> msgB = MQMessageFactory.getKeyMsg(topicB, keyB, msgSize);
+
+        producerA.send(msgA);
+        producerB.send(msgB);
+
+        long begin = System.currentTimeMillis() - 500000;
+        long end = System.currentTimeMillis() + 500000;
+        producerA.getProducer().queryMessage(topicA, keyB, msgSize * 10, begin, end).getMessageList();
+    }
+
+
+    @Test
+    public void testQueryMsgWithSameHash2() throws Exception {
+        int msgSize = 1;
+        String topicA = "AaAaTopic";
+        String keyA = "Aa";
+        String topicB = "BBBBTopic";
+        String keyB = "Aa";
+
+        initTopicWithName(topicA);
+        initTopicWithName(topicB);
+
+        RMQNormalProducer producerA = getProducer(nsAddr, topicA);
+        RMQNormalProducer producerB = getProducer(nsAddr, topicB);
+
+        List<Object> msgA = MQMessageFactory.getKeyMsg(topicA, keyA, msgSize);
+        List<Object> msgB = MQMessageFactory.getKeyMsg(topicB, keyB, msgSize);
+
+        producerA.send(msgA);
+        producerB.send(msgB);
+
+        long begin = System.currentTimeMillis() - 500000;
+        long end = System.currentTimeMillis() + 500000;
+        List<MessageExt> list = producerA.getProducer().queryMessage(topicA, keyA, msgSize * 10, begin, end).getMessageList();
+
+        assertThat(list).isNotNull();
+        assertThat(list.size()).isEqualTo(1);
+    }
 }