You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/20 08:44:00 UTC

[rocketmq] branch develop updated: [RIP-10] Add test cases of ScheduleMessageService (#782)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8357b44  [RIP-10] Add  test cases of  ScheduleMessageService (#782)
8357b44 is described below

commit 8357b4407e483111124b0c0304e4b1a7914a6d1c
Author: ranqiqiang <qi...@gmail.com>
AuthorDate: Wed Feb 20 16:43:56 2019 +0800

    [RIP-10] Add  test cases of  ScheduleMessageService (#782)
    
    [RIP-10] Add  test cases of  ScheduleMessageService
---
 .../store/schedule/ScheduleMessageServiceTest.java | 230 +++++++++++++++++++++
 1 file changed, 230 insertions(+)

diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
new file mode 100644
index 0000000..e8c9d12
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.schedule;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class ScheduleMessageServiceTest {
+
+
+    /**t
+     * defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
+     */
+    String testMessageDelayLevel = "5s 10s";
+    /**
+     * choose delay level
+     * 1 = 5s
+     */
+    int delayLevel = 1;
+
+    private static final String storePath = System.getProperty("user.home")  + File.separator + "schedule_test";
+    private static final int commitLogFileSize = 1024;
+    private static final int cqFileSize = 10;
+    private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64);
+
+    private static SocketAddress bornHost;
+    private static SocketAddress storeHost;
+    DefaultMessageStore messageStore;
+    MessageStoreConfig messageStoreConfig;
+    BrokerConfig brokerConfig;
+    ScheduleMessageService scheduleMessageService;
+
+    static String sendMessage =   " ------- schedule message test -------";
+
+
+    static {
+        try {
+            bornHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+        try {
+            storeHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    @Before
+    public void init() throws Exception {
+        messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMessageDelayLevel(testMessageDelayLevel);
+        messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
+        messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize);
+        messageStoreConfig.setMessageIndexEnable(false);
+        messageStoreConfig.setEnableConsumeQueueExt(true);
+        messageStoreConfig.setStorePathRootDir(storePath);
+        messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog");
+
+        brokerConfig = new BrokerConfig();
+        BrokerStatsManager manager = new BrokerStatsManager(brokerConfig.getBrokerClusterName());
+        messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig());
+
+        assertThat(messageStore.load()).isTrue();
+
+        messageStore.start();
+        scheduleMessageService = messageStore.getScheduleMessageService();
+    }
+
+
+
+    @Test
+    public void buildRunningStatsTest() throws InterruptedException {
+        MessageExtBrokerInner msg = buildMessage();
+        msg.setDelayTimeLevel(delayLevel);
+        messageStore.putMessage(msg);
+        // wait offsetTable
+        TimeUnit.SECONDS.sleep(1);
+        scheduleMessageService.buildRunningStats(new HashMap<String, String>() );
+    }
+
+
+    @Test
+    public void computeDeliverTimestampTest() {
+        // testMessageDelayLevel  just "5s 10s"
+        long storeTime = System.currentTimeMillis();
+        long time1 = scheduleMessageService.computeDeliverTimestamp(1, storeTime);
+        assertThat(time1).isEqualTo(storeTime + 5 * 1000);
+
+        long time2 = scheduleMessageService.computeDeliverTimestamp(2, storeTime);
+        assertThat(time2).isEqualTo(storeTime + 10 * 1000);
+
+    }
+
+
+    @Test
+    public void delayLevel2QueueIdTest() {
+        int queueId = ScheduleMessageService.delayLevel2QueueId(delayLevel);
+        assertThat(queueId).isEqualTo(delayLevel - 1);
+        queueId = ScheduleMessageService.queueId2DelayLevel(delayLevel);
+        assertThat(queueId).isEqualTo(delayLevel + 1);
+    }
+
+    @Test
+    public void deliverDelayedMessageTimerTaskTest() throws InterruptedException {
+        MessageExtBrokerInner msg = buildMessage();
+        // set delayLevel,and send delay message
+        msg.setDelayTimeLevel(delayLevel);
+        PutMessageResult result = messageStore.putMessage(msg);
+        assertThat(result.isOk()).isTrue();
+
+        // consumer message
+        Long offset = result.getAppendMessageResult().getLogicsOffset();
+        String messageGroup = "delayGroupTest";
+        GetMessageResult messageResult = messageStore.getMessage(messageGroup,msg.getTopic(),
+                msg.getQueueId(),offset,1,null);
+
+        // now, no message in queue,must wait > 5 seconds
+        assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
+
+
+        TimeUnit.SECONDS.sleep(6);
+        messageResult = messageStore.getMessage(messageGroup,msg.getTopic(),
+                msg.getQueueId(),offset,1,null);
+        // now,found the message
+        assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
+
+
+        // get the message body
+        ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());
+        List<ByteBuffer>  byteBufferList = messageResult.getMessageBufferList();
+        for (ByteBuffer bb : byteBufferList) {
+            byteBuffer.put(bb);
+        }
+
+        // warp and decode the message
+        byteBuffer = ByteBuffer.wrap(byteBuffer.array());
+        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
+        String retryMsg = new String(msgList.get(0).getBody());
+        assertThat(sendMessage).isEqualTo(retryMsg);
+
+        // add mapFile release
+        messageResult.release();
+
+    }
+
+    @Test
+    public void persist(){
+        // because of the method will wait 10s
+        scheduleMessageService.persist();
+    }
+
+
+    @After
+    public void shutdown() throws InterruptedException {
+        TimeUnit.SECONDS.sleep(1);
+        scheduleMessageService.shutdown();
+        messageStore.shutdown();
+        messageStore.destroy();
+        File file = new File(messageStoreConfig.getStorePathRootDir());
+        UtilAll.deleteFile(file);
+    }
+
+
+    public MessageExtBrokerInner buildMessage() {
+
+        byte[] msgBody = sendMessage.getBytes();
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic("schedule_topic_test");
+        msg.setTags("schedule_tag");
+        msg.setKeys("schedule_key");
+        msg.setBody(msgBody);
+        msg.setQueueId(0);
+        msg.setSysFlag(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(storeHost);
+        msg.setBornHost(bornHost);
+        return msg;
+    }
+
+
+    private class MyMessageArrivingListener implements MessageArrivingListener {
+        @Override
+        public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
+                             byte[] filterBitMap, Map<String, String> properties) {
+        }
+    }
+
+
+
+
+}