You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/14 05:51:13 UTC
[rocketmq] 18/18: Add integration test
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 50651ef8ddc66e59ccc063022d253562b94a8370
Author: nowinkey <no...@tom.com>
AuthorDate: Mon Feb 13 21:12:33 2023 +0800
Add integration test
---
.../test/smoke/NormalMessageSendAndRecvIT.java | 52 +++++++++++++++++-----
1 file changed, 42 insertions(+), 10 deletions(-)
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index f3b30b5af..1876cee64 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -17,18 +17,18 @@
package org.apache.rocketmq.test.smoke;
-import java.time.Duration;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
@@ -39,6 +39,11 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static com.google.common.truth.Truth.assertThat;
@@ -112,11 +117,38 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
@Test
public void testSynSendMessageWhenEnableBuildConsumeQueueConcurrently() throws Exception {
- Properties properties = new Properties();
- properties.setProperty("enableBuildConsumeQueueConcurrently", "true");
- defaultMQAdminExt.updateBrokerConfig(brokerController1.getBrokerAddr(), properties);
- defaultMQAdminExt.updateBrokerConfig(brokerController2.getBrokerAddr(), properties);
- defaultMQAdminExt.updateBrokerConfig(brokerController3.getBrokerAddr(), properties);
+ resetStoreConfigWithEnableBuildConsumeQueueConcurrently(true);
+
testSynSendMessage();
+
+ resetStoreConfigWithEnableBuildConsumeQueueConcurrently(false);
+ }
+
+ void resetStoreConfigWithEnableBuildConsumeQueueConcurrently(boolean enableBuildConsumeQueueConcurrently) {
+ {
+ brokerController1.shutdown();
+ MessageStoreConfig storeConfig = brokerController1.getMessageStoreConfig();
+ BrokerConfig brokerConfig = brokerController1.getBrokerConfig();
+ storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
+ brokerController1 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
+ }
+ {
+ brokerController2.shutdown();
+ MessageStoreConfig storeConfig = brokerController2.getMessageStoreConfig();
+ BrokerConfig brokerConfig = brokerController2.getBrokerConfig();
+ storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
+ brokerController2 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
+ }
+ {
+ brokerController3.shutdown();
+ MessageStoreConfig storeConfig = brokerController3.getMessageStoreConfig();
+ BrokerConfig brokerConfig = brokerController3.getBrokerConfig();
+ storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently);
+ brokerController3 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
+ }
+ brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3);
+ brokerControllerMap = brokerControllerList.stream().collect(
+ Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
}
+
}