You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2022/07/04 07:56:34 UTC
[rocketmq] branch 5.0.0-beta updated: Fix static topic test, await the client metadata to be refreshed (#4500)
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new 52d1c8cca Fix static topic test, await the client metadata to be refreshed (#4500)
52d1c8cca is described below
commit 52d1c8cca1b676fd81ba147af2c4f1e2131dbfd3
Author: Zhendong Liu <zh...@yeah.net>
AuthorDate: Mon Jul 4 15:56:19 2022 +0800
Fix static topic test, await the client metadata to be refreshed (#4500)
* Fix the unstable test
* Fix
* Fix
* Await refresh metadata instead of sleep
* Trivious changes
---
.travis.yml | 5 +-
.../rocketmq/test/util/MQAdminTestUtils.java | 36 +++++++++++++
.../org/apache/rocketmq/test/base/BaseConf.java | 19 +++++++
.../rocketmq/test/statictopic/StaticTopicIT.java | 63 +++++++++++++++++-----
4 files changed, 109 insertions(+), 14 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 837ae1fe7..8a2d8c749 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -50,8 +50,9 @@ before_script:
script:
- mvn verify -DskipTests
- travis_retry mvn -B clean apache-rat:check
- - travis_retry mvn -B install jacoco:report coveralls:report
- - travis_retry mvn -B clean install -pl test -Pit-test
+ - travis_retry mvn -B clean test jacoco:report coveralls:report
+ - travis_retry mvn -B clean test -pl test -Pit-test
+ - travis_retry mvn -B clean install -DskipTests
after_success:
- mvn sonar:sonar -Psonar-apache
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 7f6a2b6ee..d376cd0a1 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -23,12 +23,15 @@ import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
@@ -46,6 +49,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
+import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
+
public class MQAdminTestUtils {
private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
@@ -153,6 +158,36 @@ public class MQAdminTestUtils {
return false;
}
+
+ public static boolean awaitStaticTopicMs(long timeMs, String topic, DefaultMQAdminExt defaultMQAdminExt, MQClientInstance clientInstance) throws Exception {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <= timeMs) {
+ if (checkStaticTopic(topic, defaultMQAdminExt, clientInstance)) {
+ return true;
+ }
+ Thread.sleep(100);
+ }
+ return false;
+ }
+
+ //Check if the client metadata is consistent with server metadata
+ public static boolean checkStaticTopic(String topic, DefaultMQAdminExt defaultMQAdminExt, MQClientInstance clientInstance) throws Exception {
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ assert !brokerConfigMap.isEmpty();
+ TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap);
+ TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
+ Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), false, true);
+ for (int i = 0; i < globalIdMap.size(); i++) {
+ TopicQueueMappingOne mappingOne = globalIdMap.get(i);
+ String mockBrokerName = TopicQueueMappingUtils.getMockBrokerName(mappingOne.getMappingDetail().getScope());
+ String bnameFromRoute = clientInstance.getBrokerNameFromMessageQueue(new MessageQueue(topic, mockBrokerName, mappingOne.getGlobalId()));
+ if (!mappingOne.getBname().equals(bnameFromRoute)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
//should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
public static Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
@@ -172,6 +207,7 @@ public class MQAdminTestUtils {
MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
}
+
//for test only
public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 8420fdd92..4e29c84c6 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -116,6 +116,25 @@ public class BaseConf {
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
}
+ public boolean awaitDispatchMs(long timeMs) throws Exception {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <= timeMs) {
+ boolean allOk = true;
+ for (BrokerController brokerController: brokerControllerList) {
+ if (brokerController.getMessageStore().dispatchBehindBytes() != 0) {
+ allOk = false;
+ break;
+ }
+ }
+ if (allOk) {
+ return true;
+ }
+ Thread.sleep(100);
+ }
+ return false;
+ }
+
+
public static String initTopic() {
String topic = MQRandomUtils.getRandomTopic();
return initTopicWithName(topic);
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index ed93e862c..41c9c7e4f 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -21,6 +21,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
@@ -76,6 +79,7 @@ public class StaticTopicIT extends BaseConf {
defaultMQAdminExt.start();
}
+
@Test
public void testCommandsWithCluster() throws Exception {
//This case is used to mock the env to test the command manually
@@ -93,12 +97,11 @@ public class StaticTopicIT extends BaseConf {
}
{
MQAdminTestUtils.remappingStaticTopicWithCommand(topic, null, clusterName, nsAddr);
- Thread.sleep(500);
- sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 100);
+ awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
+ sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, msgEachQueue);
}
}
- @Ignore
@Test
public void testCommandsWithBrokers() throws Exception {
//This case is used to mock the env to test the command manually
@@ -117,7 +120,7 @@ public class StaticTopicIT extends BaseConf {
{
Set<String> brokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopicWithCommand(topic, brokers, null, nsAddr);
- Thread.sleep(500);
+ awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
sendMessagesAndCheck(producer, brokers, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
}
@@ -172,7 +175,7 @@ public class StaticTopicIT extends BaseConf {
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
- Thread.sleep(100);
+ Assert.assertTrue(awaitDispatchMs(500));
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
@@ -209,6 +212,7 @@ public class StaticTopicIT extends BaseConf {
/*System.out.println("produce:" + producer.getAllMsgBody().size());
System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
+ Assert.assertEquals(producer.getAllMsgBody().size(), consumer.getListener().getAllMsgBody().size());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
@@ -292,13 +296,48 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(broker2Name, mappingOne.getBname());
Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
}
- Thread.sleep(500);
+ awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
}
}
+ public boolean awaitRefreshStaticTopicMetadata(long timeMs, String topic, DefaultMQProducer producer, DefaultMQPushConsumer consumer, DefaultMQAdminExt adminExt) throws Exception {
+ long start = System.currentTimeMillis();
+ MQClientInstance currentInstance = null;
+ while (System.currentTimeMillis() - start <= timeMs) {
+ boolean allOk = true;
+ if (producer != null) {
+ currentInstance = producer.getDefaultMQProducerImpl().getmQClientFactory();
+ currentInstance.updateTopicRouteInfoFromNameServer(topic);
+ if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
+ allOk = false;
+ }
+ }
+ if (consumer != null) {
+ currentInstance = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory();
+ currentInstance.updateTopicRouteInfoFromNameServer(topic);
+ if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
+ allOk = false;
+ }
+ }
+ if (adminExt != null) {
+ currentInstance = adminExt.getDefaultMQAdminExtImpl().getMqClientInstance();
+ currentInstance.updateTopicRouteInfoFromNameServer(topic);
+ if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
+ allOk = false;
+ }
+ }
+ if (allOk) {
+ return true;
+ }
+ Thread.sleep(100);
+ }
+ return false;
+ }
+
+
@Test
public void testDoubleReadCheckConsumerOffset() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
@@ -336,11 +375,11 @@ public class StaticTopicIT extends BaseConf {
Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//make the metadata
- Thread.sleep(500);
+ awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
- TestUtils.waitForSeconds(20);
+ TestUtils.waitForSeconds(1);
consumeStats = defaultMQAdminExt.examineConsumeStats(group);
messageQueues = producer.getMessageQueue();
@@ -367,7 +406,7 @@ public class StaticTopicIT extends BaseConf {
Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
- Thread.sleep(500);
+ awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
}
@@ -376,7 +415,7 @@ public class StaticTopicIT extends BaseConf {
Set<String> targetBrokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
- Thread.sleep(500);
+ awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1 * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
@@ -385,7 +424,7 @@ public class StaticTopicIT extends BaseConf {
Set<String> targetBrokers = ImmutableSet.of(broker3Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
- Thread.sleep(500);
+ awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2 * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
@@ -467,7 +506,7 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
}
//leave the time to refresh the metadata
- Thread.sleep(500);
+ awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
//here the gen should be 0
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
}