You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/05/06 02:24:45 UTC
[rocketmq] branch pop_consumer updated: [RIP-19] Pop Consuming
(integration test)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch pop_consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/pop_consumer by this push:
new 80bc36c [RIP-19] Pop Consuming (integration test)
new 0938aab Merge pull request #2835 from hill007299/pop_consumer
80bc36c is described below
commit 80bc36cf304e5e938ff1dbe6c20bcf7664daf2d9
Author: hill007299 <hi...@126.com>
AuthorDate: Tue Mar 9 11:22:37 2021 +0800
[RIP-19] Pop Consuming (integration test)
---
.../rocketmq/test/base/IntegrationTestBase.java | 12 +--
.../test/client/consumer/pop/PopSubCheckIT.java | 89 ++++++++++++++++++++++
.../test/smoke/NormalMessageSendAndRecvIT.java | 6 ++
3 files changed, 101 insertions(+), 6 deletions(-)
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index c484e87..a32da03 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.base;
+import com.google.common.truth.Truth;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@@ -26,16 +27,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.TestUtils;
-import org.junit.Assert;
public class IntegrationTestBase {
public static InternalLogger logger = InternalLoggerFactory.getLogger(IntegrationTestBase.class);
@@ -113,7 +113,7 @@ public class IntegrationTestBase {
nameServerNettyServerConfig.setListenPort(nextPort());
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
try {
- Assert.assertTrue(namesrvController.initialize());
+ Truth.assertThat(namesrvController.initialize()).isTrue();
logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
namesrvController.start();
} catch (Exception e) {
@@ -149,7 +149,7 @@ public class IntegrationTestBase {
storeConfig.setHaListenPort(nextPort());
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
- Assert.assertTrue(brokerController.initialize());
+ Truth.assertThat(brokerController.initialize()).isTrue();
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Throwable t) {
@@ -169,8 +169,8 @@ public class IntegrationTestBase {
if (createResult) {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
- Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
- System.currentTimeMillis() - startTime));
+ Truth.assertWithMessage(String.format("topic[%s] is created failed after:%d ms", topic,
+ System.currentTimeMillis() - startTime)).fail();
break;
} else {
TestUtils.waitForMoment(500);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java
new file mode 100644
index 0000000..74a9e04
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.logging.inner.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
+import org.apache.rocketmq.test.util.RandomUtil;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class PopSubCheckIT extends BaseConf {
+ private static Logger logger = Logger.getLogger(PopSubCheckIT.class);
+ private String group;
+
+ private DefaultMQAdminExt defaultMQAdminExt;
+
+ @Before
+ public void setUp() throws Exception {
+ group = initConsumerGroup();
+
+ defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExt.setInstanceName(RandomUtil.getStringByUUID());
+ defaultMQAdminExt.start();
+ }
+
+ @After
+ public void tearDown() {
+ defaultMQAdminExt.shutdown();
+ super.shutdown();
+ }
+
+ @Test
+ public void testNormalPopAck() throws Exception {
+ String topic = initTopic();
+ logger.info(String.format("use topic: %s; group: %s !", topic, group));
+
+ RMQNormalProducer producer = getProducer(nsAddr, topic);
+ producer.getProducer().setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+
+ RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+
+ for (String brokerAddr : new String[]{brokerController1.getBrokerAddr(), brokerController2.getBrokerAddr()}) {
+ defaultMQAdminExt.setMessageRequestMode(brokerAddr, topic, group, MessageRequestMode.POP, 8, 60_000);
+ }
+
+ int msgNum = 1;
+ producer.send(msgNum);
+ Assert.assertEquals("Not all sent succeeded", msgNum, producer.getAllUndupMsgBody().size());
+ logger.info(producer.getFirstMsg());
+
+ consumer.getListener().waitForMessageConsume(msgNum, 30_000);
+ assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody()))
+ .containsExactlyElementsIn(producer.getAllMsgBody());
+ for (Object o : consumer.getListener().getAllOriginMsg()) {
+ MessageClientExt msg = (MessageClientExt) o;
+ assertThat(msg.getProperty(MessageConst.PROPERTY_POP_CK)).isNotEmpty();
+ }
+
+ consumer.getListener().waitForMessageConsume(msgNum, 3_000 * 9);
+ assertThat(consumer.getListener().getAllOriginMsg().size()).isEqualTo(msgNum);
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index c788655..81dc864 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.test.smoke;
import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -58,5 +60,9 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
+ for (Object o : consumer.getListener().getAllOriginMsg()) {
+ MessageClientExt msg = (MessageClientExt) o;
+ assertThat(msg.getProperty(MessageConst.PROPERTY_POP_CK)).isNull();
+ }
}
}