You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/14 05:51:13 UTC

[rocketmq] 18/18: Add integration test

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 50651ef8ddc66e59ccc063022d253562b94a8370
Author: nowinkey <no...@tom.com>
AuthorDate: Mon Feb 13 21:12:33 2023 +0800

    Add integration test
---
 .../test/smoke/NormalMessageSendAndRecvIT.java     | 52 +++++++++++++++++-----
 1 file changed, 42 insertions(+), 10 deletions(-)

diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index f3b30b5af..1876cee64 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -17,18 +17,18 @@
 
 package org.apache.rocketmq.test.smoke;
 
-import java.time.Duration;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
@@ -39,6 +39,11 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static com.google.common.truth.Truth.assertThat;
 
@@ -112,11 +117,38 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
 
     @Test
     public void testSynSendMessageWhenEnableBuildConsumeQueueConcurrently() throws Exception {
-        Properties properties = new Properties();
-        properties.setProperty("enableBuildConsumeQueueConcurrently", "true");
-        defaultMQAdminExt.updateBrokerConfig(brokerController1.getBrokerAddr(), properties);
-        defaultMQAdminExt.updateBrokerConfig(brokerController2.getBrokerAddr(), properties);
-        defaultMQAdminExt.updateBrokerConfig(brokerController3.getBrokerAddr(), properties);
+        resetStoreConfigWithEnableBuildConsumeQueueConcurrently(true);
+
         testSynSendMessage();
+
+        resetStoreConfigWithEnableBuildConsumeQueueConcurrently(false);
+    }
+
+    void resetStoreConfigWithEnableBuildConsumeQueueConcurrently(boolean enableBuildConsumeQueueConcurrently) {
+        {
+            brokerController1.shutdown();
+            MessageStoreConfig storeConfig = brokerController1.getMessageStoreConfig();
+            BrokerConfig brokerConfig = brokerController1.getBrokerConfig();
+            storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
+            brokerController1 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
+        }
+        {
+            brokerController2.shutdown();
+            MessageStoreConfig storeConfig = brokerController2.getMessageStoreConfig();
+            BrokerConfig brokerConfig = brokerController2.getBrokerConfig();
+            storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
+            brokerController2 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
+        }
+        {
+            brokerController3.shutdown();
+            MessageStoreConfig storeConfig = brokerController3.getMessageStoreConfig();
+            BrokerConfig brokerConfig = brokerController3.getBrokerConfig();
+            storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
+            brokerController3 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
+        }
+        brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3);
+        brokerControllerMap = brokerControllerList.stream().collect(
+                Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
     }
+
 }