You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/05/06 02:24:45 UTC

[rocketmq] branch pop_consumer updated: [RIP-19] Pop Consuming (integration test)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch pop_consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/pop_consumer by this push:
     new 80bc36c  [RIP-19] Pop Consuming (integration test)
     new 0938aab  Merge pull request #2835 from hill007299/pop_consumer
80bc36c is described below

commit 80bc36cf304e5e938ff1dbe6c20bcf7664daf2d9
Author: hill007299 <hi...@126.com>
AuthorDate: Tue Mar 9 11:22:37 2021 +0800

    [RIP-19] Pop Consuming (integration test)
---
 .../rocketmq/test/base/IntegrationTestBase.java    | 12 +--
 .../test/client/consumer/pop/PopSubCheckIT.java    | 89 ++++++++++++++++++++++
 .../test/smoke/NormalMessageSendAndRecvIT.java     |  6 ++
 3 files changed, 101 insertions(+), 6 deletions(-)

diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index c484e87..a32da03 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.test.base;
 
+import com.google.common.truth.Truth;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,16 +27,15 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.test.util.MQAdmin;
 import org.apache.rocketmq.test.util.TestUtils;
-import org.junit.Assert;
 
 public class IntegrationTestBase {
     public static InternalLogger logger = InternalLoggerFactory.getLogger(IntegrationTestBase.class);
@@ -113,7 +113,7 @@ public class IntegrationTestBase {
         nameServerNettyServerConfig.setListenPort(nextPort());
         NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
         try {
-            Assert.assertTrue(namesrvController.initialize());
+            Truth.assertThat(namesrvController.initialize()).isTrue();
             logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
             namesrvController.start();
         } catch (Exception e) {
@@ -149,7 +149,7 @@ public class IntegrationTestBase {
         storeConfig.setHaListenPort(nextPort());
         BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
         try {
-            Assert.assertTrue(brokerController.initialize());
+            Truth.assertThat(brokerController.initialize()).isTrue();
             logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
             brokerController.start();
         } catch (Throwable t) {
@@ -169,8 +169,8 @@ public class IntegrationTestBase {
             if (createResult) {
                 break;
             } else if (System.currentTimeMillis() - startTime > topicCreateTime) {
-                Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
-                    System.currentTimeMillis() - startTime));
+                Truth.assertWithMessage(String.format("topic[%s] is created failed after:%d ms", topic,
+                    System.currentTimeMillis() - startTime)).fail();
                 break;
             } else {
                 TestUtils.waitForMoment(500);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java
new file mode 100644
index 0000000..74a9e04
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.logging.inner.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
+import org.apache.rocketmq.test.util.RandomUtil;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class PopSubCheckIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(PopSubCheckIT.class);
+    private String group;
+
+    private DefaultMQAdminExt defaultMQAdminExt;
+
+    @Before
+    public void setUp() throws Exception {
+        group = initConsumerGroup();
+
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExt.setInstanceName(RandomUtil.getStringByUUID());
+        defaultMQAdminExt.start();
+    }
+
+    @After
+    public void tearDown() {
+        defaultMQAdminExt.shutdown();
+        super.shutdown();
+    }
+
+    @Test
+    public void testNormalPopAck() throws Exception {
+        String topic = initTopic();
+        logger.info(String.format("use topic: %s; group: %s !", topic, group));
+
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+        producer.getProducer().setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+
+        RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+
+        for (String brokerAddr : new String[]{brokerController1.getBrokerAddr(), brokerController2.getBrokerAddr()}) {
+            defaultMQAdminExt.setMessageRequestMode(brokerAddr, topic, group, MessageRequestMode.POP, 8, 60_000);
+        }
+
+        int msgNum = 1;
+        producer.send(msgNum);
+        Assert.assertEquals("Not all sent succeeded", msgNum, producer.getAllUndupMsgBody().size());
+        logger.info(producer.getFirstMsg());
+
+        consumer.getListener().waitForMessageConsume(msgNum, 30_000);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+        for (Object o : consumer.getListener().getAllOriginMsg()) {
+            MessageClientExt msg = (MessageClientExt) o;
+            assertThat(msg.getProperty(MessageConst.PROPERTY_POP_CK)).isNotEmpty();
+        }
+
+        consumer.getListener().waitForMessageConsume(msgNum, 3_000 * 9);
+        assertThat(consumer.getListener().getAllOriginMsg().size()).isEqualTo(msgNum);
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index c788655..81dc864 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -18,6 +18,8 @@
 package org.apache.rocketmq.test.smoke;
 
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -58,5 +60,9 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
             consumer.getListener().getAllMsgBody()))
             .containsExactlyElementsIn(producer.getAllMsgBody());
+        for (Object o : consumer.getListener().getAllOriginMsg()) {
+            MessageClientExt msg = (MessageClientExt) o;
+            assertThat(msg.getProperty(MessageConst.PROPERTY_POP_CK)).isNull();
+        }
     }
 }