You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2022/07/04 07:56:34 UTC

[rocketmq] branch 5.0.0-beta updated: Fix static topic test, await the client metadata to be refreshed (#4500)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new 52d1c8cca Fix static topic test, await the client metadata to be refreshed (#4500)
52d1c8cca is described below

commit 52d1c8cca1b676fd81ba147af2c4f1e2131dbfd3
Author: Zhendong Liu <zh...@yeah.net>
AuthorDate: Mon Jul 4 15:56:19 2022 +0800

    Fix static topic test, await the client metadata to be refreshed (#4500)
    
    * Fix the unstable test
    
    * Fix
    
    * Fix
    
    * Await refresh metadata instead of sleep
    
    * Trivious changes
---
 .travis.yml                                        |  5 +-
 .../rocketmq/test/util/MQAdminTestUtils.java       | 36 +++++++++++++
 .../org/apache/rocketmq/test/base/BaseConf.java    | 19 +++++++
 .../rocketmq/test/statictopic/StaticTopicIT.java   | 63 +++++++++++++++++-----
 4 files changed, 109 insertions(+), 14 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 837ae1fe7..8a2d8c749 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -50,8 +50,9 @@ before_script:
 script:
   - mvn verify -DskipTests
   - travis_retry mvn -B clean apache-rat:check
-  - travis_retry mvn -B install jacoco:report coveralls:report
-  - travis_retry mvn -B clean install -pl test -Pit-test
+  - travis_retry mvn -B clean test jacoco:report coveralls:report
+  - travis_retry mvn -B clean test -pl test -Pit-test
+  - travis_retry mvn -B clean install -DskipTests
 
 after_success:
   - mvn sonar:sonar -Psonar-apache
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 7f6a2b6ee..d376cd0a1 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -23,12 +23,15 @@ import org.apache.commons.cli.PosixParser;
 import org.apache.log4j.Logger;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
@@ -46,6 +49,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ForkJoinPool;
 
+import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
+
 public class MQAdminTestUtils {
     private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
 
@@ -153,6 +158,36 @@ public class MQAdminTestUtils {
         return false;
     }
 
+
+    public static boolean awaitStaticTopicMs(long timeMs, String topic, DefaultMQAdminExt defaultMQAdminExt, MQClientInstance clientInstance) throws Exception {
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start <= timeMs) {
+            if (checkStaticTopic(topic, defaultMQAdminExt, clientInstance)) {
+                return true;
+            }
+            Thread.sleep(100);
+        }
+        return false;
+    }
+
+    //Check if the client metadata is consistent with server metadata
+    public static boolean checkStaticTopic(String topic, DefaultMQAdminExt defaultMQAdminExt, MQClientInstance clientInstance) throws Exception {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+        assert !brokerConfigMap.isEmpty();
+        TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
+        TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
+        Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+        for (int i = 0; i < globalIdMap.size(); i++) {
+            TopicQueueMappingOne mappingOne = globalIdMap.get(i);
+            String mockBrokerName = TopicQueueMappingUtils.getMockBrokerName(mappingOne.getMappingDetail().getScope());
+            String bnameFromRoute = clientInstance.getBrokerNameFromMessageQueue(new MessageQueue(topic, mockBrokerName, mappingOne.getGlobalId()));
+            if (!mappingOne.getBname().equals(bnameFromRoute)) {
+                return false;
+            }
+        }
+        return  true;
+    }
+
     //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
     public static Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
@@ -172,6 +207,7 @@ public class MQAdminTestUtils {
         MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
     }
 
+
     //for test only
     public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 8420fdd92..4e29c84c6 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -116,6 +116,25 @@ public class BaseConf {
         ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
     }
 
+    public boolean awaitDispatchMs(long timeMs) throws Exception {
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start <= timeMs) {
+            boolean allOk = true;
+            for (BrokerController brokerController: brokerControllerList) {
+                if (brokerController.getMessageStore().dispatchBehindBytes() != 0) {
+                    allOk = false;
+                    break;
+                }
+            }
+            if (allOk) {
+                return true;
+            }
+            Thread.sleep(100);
+        }
+        return false;
+    }
+
+
     public static String initTopic() {
         String topic = MQRandomUtils.getRandomTopic();
         return initTopicWithName(topic);
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index ed93e862c..41c9c7e4f 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -21,6 +21,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.log4j.Logger;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
@@ -76,6 +79,7 @@ public class StaticTopicIT extends BaseConf {
         defaultMQAdminExt.start();
     }
 
+
     @Test
     public void testCommandsWithCluster() throws Exception {
         //This case is used to mock the env to test the command manually
@@ -93,12 +97,11 @@ public class StaticTopicIT extends BaseConf {
         }
         {
             MQAdminTestUtils.remappingStaticTopicWithCommand(topic, null, clusterName, nsAddr);
-            Thread.sleep(500);
-            sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 100);
+            awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
+            sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, msgEachQueue);
         }
     }
 
-    @Ignore
     @Test
     public void testCommandsWithBrokers() throws Exception {
         //This case is used to mock the env to test the command manually
@@ -117,7 +120,7 @@ public class StaticTopicIT extends BaseConf {
         {
             Set<String> brokers = ImmutableSet.of(broker2Name);
             MQAdminTestUtils.remappingStaticTopicWithCommand(topic, brokers, null, nsAddr);
-            Thread.sleep(500);
+            awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
             sendMessagesAndCheck(producer, brokers, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
             consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
         }
@@ -172,7 +175,7 @@ public class StaticTopicIT extends BaseConf {
         }
         Assert.assertEquals(0, producer.getSendErrorMsg().size());
         //leave the time to build the cq
-        Thread.sleep(100);
+        Assert.assertTrue(awaitDispatchMs(500));
         for(MessageQueue messageQueue: messageQueueList) {
             Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
             Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
@@ -209,6 +212,7 @@ public class StaticTopicIT extends BaseConf {
         /*System.out.println("produce:" + producer.getAllMsgBody().size());
         System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
 
+        Assert.assertEquals(producer.getAllMsgBody().size(), consumer.getListener().getAllMsgBody().size());
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
                 consumer.getListener().getAllMsgBody()))
                 .containsExactlyElementsIn(producer.getAllMsgBody());
@@ -292,13 +296,48 @@ public class StaticTopicIT extends BaseConf {
                 Assert.assertEquals(broker2Name, mappingOne.getBname());
                 Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
             }
-            Thread.sleep(500);
+            awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
             consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
         }
     }
 
 
+    public boolean awaitRefreshStaticTopicMetadata(long timeMs, String topic, DefaultMQProducer producer, DefaultMQPushConsumer consumer, DefaultMQAdminExt adminExt) throws Exception {
+        long start = System.currentTimeMillis();
+        MQClientInstance currentInstance = null;
+        while (System.currentTimeMillis() - start <= timeMs) {
+            boolean allOk = true;
+            if (producer != null) {
+                currentInstance = producer.getDefaultMQProducerImpl().getmQClientFactory();
+                currentInstance.updateTopicRouteInfoFromNameServer(topic);
+                if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
+                    allOk = false;
+                }
+            }
+            if (consumer != null) {
+                currentInstance = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory();
+                currentInstance.updateTopicRouteInfoFromNameServer(topic);
+                if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
+                    allOk = false;
+                }
+            }
+            if (adminExt != null) {
+                currentInstance = adminExt.getDefaultMQAdminExtImpl().getMqClientInstance();
+                currentInstance.updateTopicRouteInfoFromNameServer(topic);
+                if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
+                    allOk = false;
+                }
+            }
+            if (allOk) {
+                return true;
+            }
+            Thread.sleep(100);
+        }
+        return false;
+    }
+
+
     @Test
     public void testDoubleReadCheckConsumerOffset() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
@@ -336,11 +375,11 @@ public class StaticTopicIT extends BaseConf {
             Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             //make the metadata
-            Thread.sleep(500);
+            awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
         }
 
-        TestUtils.waitForSeconds(20);
+        TestUtils.waitForSeconds(1);
         consumeStats = defaultMQAdminExt.examineConsumeStats(group);
 
         messageQueues = producer.getMessageQueue();
@@ -367,7 +406,7 @@ public class StaticTopicIT extends BaseConf {
             Set<String> targetBrokers = ImmutableSet.of(broker1Name);
             MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
-            Thread.sleep(500);
+            awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
         }
 
@@ -376,7 +415,7 @@ public class StaticTopicIT extends BaseConf {
             Set<String> targetBrokers = ImmutableSet.of(broker2Name);
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
-            Thread.sleep(500);
+            awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1 * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
         }
 
@@ -385,7 +424,7 @@ public class StaticTopicIT extends BaseConf {
             Set<String> targetBrokers = ImmutableSet.of(broker3Name);
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
-            Thread.sleep(500);
+            awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2 * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
         }
 
@@ -467,7 +506,7 @@ public class StaticTopicIT extends BaseConf {
                 Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
             }
             //leave the time to refresh the metadata
-            Thread.sleep(500);
+            awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
             //here the gen should be 0
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
         }