You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by st...@apache.org on 2017/01/22 12:12:13 UTC

[08/31] incubator-rocketmq git commit: [RocketMQ-58] Add integration test for RocketMQ, also thanks @fenglianghfl for this commit, closes apache/incubator-rocketmq#46

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
new file mode 100644
index 0000000..57b69d2
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.order;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.test.util.MQWait;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OrderMsgRebalanceIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(OrderMsgRebalanceIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumersBalance() {
+        int msgSize = 10;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQOrderListener());
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
+            "*", new RMQOrderListener());
+        TestUtils.waitForSeconds(waitTime);
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        boolean balance = VerifyUtils.verifyBalance(producer.getAllMsgBody().size(),
+            VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer1.getListner().getAllUndupMsgBody()).size(),
+            VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer2.getListner().getAllUndupMsgBody()).size());
+        assertThat(balance).isEqualTo(true);
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+    @Test
+    public void testFourConsuemrBalance() {
+        int msgSize = 20;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQOrderListener());
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
+            "*", new RMQOrderListener());
+        RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
+            "*", new RMQOrderListener());
+        RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
+            "*", new RMQOrderListener());
+        TestUtils.waitForSeconds(waitTime);
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner(), consumer3.getListner(),
+            consumer4.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        boolean balance = VerifyUtils
+            .verifyBalance(producer.getAllMsgBody().size(),
+                VerifyUtils
+                    .getFilterdMessage(producer.getAllMsgBody(),
+                        consumer1.getListner().getAllUndupMsgBody())
+                    .size(),
+                VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                    consumer2.getListner().getAllUndupMsgBody()).size(),
+                VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                    consumer3.getListner().getAllUndupMsgBody()).size(),
+                VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                    consumer4.getListner().getAllUndupMsgBody()).size());
+        logger.info(String.format("consumer1:%s;consumer2:%s;consumer3:%s,consumer4:%s",
+            consumer1.getListner().getAllMsgBody().size(),
+            consumer2.getListner().getAllMsgBody().size(),
+            consumer3.getListner().getAllMsgBody().size(),
+            consumer4.getListner().getAllMsgBody().size()));
+        assertThat(balance).isEqualTo(true);
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer3.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer4.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java
new file mode 100644
index 0000000..7db77de
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.order;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.test.util.MQWait;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OrderMsgWithTagIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(OrderMsgIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        shutDown();
+    }
+
+    @Test
+    public void testOrderMsgWithTagSubAll() {
+        int msgSize = 10;
+        String tag = "jueyin_tag";
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener());
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(mqMsgs.getMsgBodys());
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+    @Test
+    public void testOrderMsgWithTagSubTag() {
+        int msgSize = 5;
+        String tag = "jueyin_tag";
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQOrderListener());
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(mqMsgs.getMsgBodys());
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+    @Test
+    public void testOrderMsgWithTag1AndTag2SubTag1() {
+        int msgSize = 5;
+        String tag1 = "jueyin_tag_1";
+        String tag2 = "jueyin_tag_2";
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag1, new RMQOrderListener());
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag2);
+        producer.send(mqMsgs.getMsgsWithMQ());
+        producer.clearMsg();
+
+        mqMsgs = new MessageQueueMsg(mqs, msgSize, tag1);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(mqMsgs.getMsgBodys());
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+    @Test
+    public void testTwoConsumerSubTag() {
+        int msgSize = 10;
+        String tag1 = "jueyin_tag_1";
+        String tag2 = "jueyin_tag_2";
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag1,
+            new RMQOrderListener("consumer1"));
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic, tag2,
+            new RMQOrderListener("consumer2"));
+        List<MessageQueue> mqs = producer.getMessageQueue();
+
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag1);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        mqMsgs = new MessageQueueMsg(mqs, msgSize, tag2);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+    @Test
+    public void testConsumeTwoTag() {
+        int msgSize = 10;
+        String tag1 = "jueyin_tag_1";
+        String tag2 = "jueyin_tag_2";
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic,
+            String.format("%s||%s", tag1, tag2), new RMQOrderListener());
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag1);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        mqMsgs = new MessageQueueMsg(mqs, msgSize, tag2);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
+            consumer.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java
new file mode 100644
index 0000000..a1520a0
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.querymsg;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class QueryMsgByIdExceptionIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(QueryMsgByKeyIT.class);
+    private static RMQNormalProducer producer = null;
+    private static String topic = null;
+
+    @BeforeClass
+    public static void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        shutDown();
+    }
+
+    @Test
+    public void testQueryMsgByErrorMsgId() {
+        producer.clearMsg();
+        int msgSize = 20;
+        String errorMsgId = "errorMsgId";
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
+
+        MessageExt queryMsg = null;
+        try {
+            queryMsg = producer.getProducer().viewMessage(errorMsgId);
+        } catch (Exception e) {
+        }
+
+        assertThat(queryMsg).isNull();
+    }
+
+    @Test
+    public void testQueryMsgByNullMsgId() {
+        producer.clearMsg();
+        int msgSize = 20;
+        String errorMsgId = null;
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
+
+        MessageExt queryMsg = null;
+        try {
+            queryMsg = producer.getProducer().viewMessage(errorMsgId);
+        } catch (Exception e) {
+        }
+
+        assertThat(queryMsg).isNull();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java
new file mode 100644
index 0000000..92c40c7
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.querymsg;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class QueryMsgByIdIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(QueryMsgByIdIT.class);
+    private RMQNormalProducer producer = null;
+    private RMQNormalConsumer consumer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+        consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+    }
+
+    @After
+    public void tearDown() {
+        shutDown();
+    }
+
+    @Test
+    public void testQueryMsg() {
+        int msgSize = 20;
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+        Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()));
+
+        MessageExt recvMsg = (MessageExt) consumer.getListner().getFirstMsg();
+        MessageExt queryMsg = null;
+        try {
+            TestUtils.waitForMonment(3000);
+            queryMsg = producer.getProducer().viewMessage(((MessageClientExt) recvMsg).getOffsetMsgId());
+        } catch (Exception e) {
+        }
+
+        assertThat(queryMsg).isNotNull();
+        assertThat(new String(queryMsg.getBody())).isEqualTo(new String(recvMsg.getBody()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
new file mode 100644
index 0000000..ec45a29
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.querymsg;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class QueryMsgByKeyIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(QueryMsgByKeyIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        shutDown();
+    }
+
+    @Test
+    public void testQueryMsg() {
+        int msgSize = 20;
+        String key = "jueyin";
+        long begin = System.currentTimeMillis();
+        List<Object> msgs = MQMessageFactory.getKeyMsg(topic, key, msgSize);
+        producer.send(msgs);
+        Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
+
+        List<MessageExt> queryMsgs = null;
+        try {
+            TestUtils.waitForMonment(500 * 3);
+            queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 5000,
+                System.currentTimeMillis() + 5000).getMessageList();
+        } catch (Exception e) {
+        }
+
+        assertThat(queryMsgs).isNotNull();
+        assertThat(queryMsgs.size()).isEqualTo(msgSize);
+    }
+
+    @Test
+    public void testQueryMax() {
+        int msgSize = 500;
+        int max = 64 * brokerNum;
+        String key = "jueyin";
+        long begin = System.currentTimeMillis();
+        List<Object> msgs = MQMessageFactory.getKeyMsg(topic, key, msgSize);
+        producer.send(msgs);
+        Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
+
+        List<MessageExt> queryMsgs = null;
+        try {
+            queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 15000,
+                System.currentTimeMillis() + 15000).getMessageList();
+
+            int i = 3;
+            while (queryMsgs == null || queryMsgs.size() != brokerNum) {
+                i--;
+                queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 15000,
+                    System.currentTimeMillis() + 15000).getMessageList();
+                TestUtils.waitForMonment(1000);
+
+                if (i == 0 || (queryMsgs != null && queryMsgs.size() == max)) {
+                    break;
+                }
+            }
+        } catch (Exception e) {
+        }
+
+        assertThat(queryMsgs).isNotNull();
+        assertThat(queryMsgs.size()).isEqualTo(max);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/delay/DelayConf.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/DelayConf.java b/test/src/test/java/org/apache/rocketmq/test/delay/DelayConf.java
new file mode 100644
index 0000000..6c23473
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/delay/DelayConf.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.delay;
+
+import org.apache.rocketmq.test.base.BaseConf;
+
+public class DelayConf extends BaseConf {
+    protected static final int[] DELAY_LEVEL = {
+        1, 5, 10, 30, 1 * 60, 5 * 60, 10 * 60,
+        30 * 60, 1 * 3600, 2 * 3600, 6 * 3600, 12 * 3600, 1 * 24 * 3600};
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java
new file mode 100644
index 0000000..5206dcb
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.delay;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQDelayListner;
+import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NormalMsgDelayIT extends DelayConf {
+    private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class);
+    protected int msgSize = 100;
+    private RMQNormalProducer producer = null;
+    private RMQNormalConsumer consumer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+        consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener());
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testDelayLevell() {
+        int delayLevel = 1;
+        List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize);
+        producer.send(delayMsgs);
+        Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+        Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()));
+        Assert.assertEquals("Timer is not correct", true,
+            VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000,
+                ((RMQDelayListner) consumer.getListner()).getMsgDelayTimes()));
+    }
+
+    @Test
+    public void testDelayLevel2() {
+        int delayLevel = 2;
+        List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize);
+        producer.send(delayMsgs);
+        Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(),
+            DELAY_LEVEL[delayLevel - 1] * 1000 * 2);
+        Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()));
+        Assert.assertEquals("Timer is not correct", true,
+            VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000,
+                ((RMQDelayListner) consumer.getListner()).getMsgDelayTimes()));
+    }
+
+    @Test
+    public void testDelayLevel3() {
+        int delayLevel = 3;
+        List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize);
+        producer.send(delayMsgs);
+        Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(),
+            DELAY_LEVEL[delayLevel - 1] * 1000 * 2);
+        Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()));
+        Assert.assertEquals("Timer is not correct", true,
+            VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000,
+                ((RMQDelayListner) consumer.getListner()).getMsgDelayTimes()));
+    }
+
+    @Test
+    public void testDelayLevel4() {
+        int delayLevel = 4;
+        List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize);
+        producer.send(delayMsgs);
+        Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(),
+            DELAY_LEVEL[delayLevel - 1] * 1000 * 2);
+        Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()));
+        Assert.assertEquals("Timer is not correct", true,
+            VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000,
+                ((RMQDelayListner) consumer.getListner()).getMsgDelayTimes()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
new file mode 100644
index 0000000..c422501
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.smoke;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class NormalMessageSendAndRecvIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(NormalMessageSendAndRecvIT.class);
+    private RMQNormalConsumer consumer = null;
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+        consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSynSendMessage() {
+        int msgSize = 10;
+        producer.send(msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.xml b/test/src/test/resources/log4j.xml
new file mode 100644
index 0000000..3031095
--- /dev/null
+++ b/test/src/test/resources/log4j.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %p [%F %M %L]: %m%n"/>
+        </layout>
+        <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="LevelMax" value="ERROR"/>
+            <param name="LevelMin" value="TRACE"/>
+        </filter>
+    </appender>
+
+    <appender name="TOTAL" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="it_test.log"/>
+        <param name="append" value="false"/>
+        <param name="MaxFileSize" value="10240000"/>
+        <param name="MaxBackupIndex" value="10"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+        </layout>
+    </appender>
+
+    <root>
+        <level value="OFF"/>
+        <appender-ref ref="TOTAL"/>
+    </root>
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/test/src/test/resources/logback-test.xml b/test/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..eb12a9a
--- /dev/null
+++ b/test/src/test/resources/logback-test.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+
+    <appender name="DefaultAppender" class="ch.qos.logback.core.ConsoleAppender">
+        <append>true</append>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH\:mm\:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <root>
+        <level value="OFF"/>
+        <appender-ref ref="DefaultAppender"/>
+    </root>
+</configuration>