You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/22 06:23:59 UTC

[rocketmq] branch develop updated: [RIP-10] Add test cases for DefaultMessageStore.CleanCommitLogService and DefaultMessageStore.CleanConsumeQueueService (#836)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a9b6939  [RIP-10]  Add test cases for DefaultMessageStore.CleanCommitLogService and DefaultMessageStore.CleanConsumeQueueService (#836)
a9b6939 is described below

commit a9b69391dd274f2a1d17086944165ddd23e60851
Author: Eks OU <ek...@dingtalk.com>
AuthorDate: Fri Feb 22 14:23:51 2019 +0800

    [RIP-10]  Add test cases for DefaultMessageStore.CleanCommitLogService and DefaultMessageStore.CleanConsumeQueueService (#836)
    
    [RIP-10]  Add test cases for DefaultMessageStore.CleanCommitLogService and DefaultMessageStore.CleanConsumeQueueService
---
 .../store/DefaultMessageStoreCleanFilesTest.java   | 386 +++++++++++++++++++++
 1 file changed, 386 insertions(+)

diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
new file mode 100644
index 0000000..cf970e6
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Calendar;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.rocketmq.common.message.MessageDecoder.CHARSET_UTF8;
+import static org.apache.rocketmq.store.ConsumeQueue.CQ_STORE_UNIT_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test case for DefaultMessageStore.CleanCommitLogService and DefaultMessageStore.CleanConsumeQueueService
+ */
+public class DefaultMessageStoreCleanFilesTest {
+    private DefaultMessageStore messageStore;
+    private DefaultMessageStore.CleanCommitLogService cleanCommitLogService;
+    private DefaultMessageStore.CleanConsumeQueueService cleanConsumeQueueService;
+
+    private SocketAddress bornHost;
+    private SocketAddress storeHost;
+
+    private String topic = "test";
+    private int queueId = 0;
+    private int fileCountCommitLog = 55;
+    // exactly one message per CommitLog file.
+    private int msgCount = fileCountCommitLog;
+    private int mappedFileSize = 128;
+    private int fileReservedTime = 1;
+
+    @Before
+    public void init() throws Exception {
+        storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+    }
+
+    @Test
+    public void testDeleteExpiredFilesByTimeUp() throws Exception {
+        String deleteWhen = Calendar.getInstance().get(Calendar.HOUR_OF_DAY) + "";
+        // the max value of diskMaxUsedSpaceRatio
+        int diskMaxUsedSpaceRatio = 99;
+        // used to ensure that automatic file deletion is not triggered
+        double diskSpaceCleanForciblyRatio = 0.999D;
+        initMessageStore(deleteWhen, diskMaxUsedSpaceRatio, diskSpaceCleanForciblyRatio);
+
+        // build and put 55 messages, exactly one message per CommitLog file.
+        buildAndPutMessagesToMessageStore(msgCount);
+
+        // undo comment out the code below, if want to debug this case rather than just run it.
+        // Thread.sleep(1000 * 60 + 100);
+
+        MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog();
+        assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size());
+
+        int fileCountConsumeQueue = getFileCountConsumeQueue();
+        MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
+        assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+        int expireFileCount = 15;
+        expireFiles(commitLogQueue, expireFileCount);
+
+        // magic code 10 reference to MappedFileQueue#DELETE_FILES_BATCH_MAX
+        for (int a = 1, fileCount = expireFileCount; a <= (int) Math.ceil((double) expireFileCount / 10); a++, fileCount -= 10) {
+            cleanCommitLogService.run();
+            cleanConsumeQueueService.run();
+
+            int expectDeletedCount = fileCount >= 10 ? a * 10 : ((a - 1) * 10 + fileCount);
+            assertEquals(fileCountCommitLog - expectDeletedCount, commitLogQueue.getMappedFiles().size());
+
+            int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
+            int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
+            assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+        }
+    }
+
+    @Test
+    public void testDeleteExpiredFilesBySpaceFull() throws Exception {
+        String deleteWhen = "04";
+        // the min value of diskMaxUsedSpaceRatio. make sure disk space usage is greater than 10%
+        int diskMaxUsedSpaceRatio = 1;
+        // used to ensure that automatic file deletion is not triggered
+        double diskSpaceCleanForciblyRatio = 0.999D;
+        initMessageStore(deleteWhen, diskMaxUsedSpaceRatio, diskSpaceCleanForciblyRatio);
+
+        // build and put 55 messages, exactly one message per CommitLog file.
+        buildAndPutMessagesToMessageStore(msgCount);
+
+        // undo comment out the code below, if want to debug this case rather than just run it.
+        // Thread.sleep(1000 * 60 + 100);
+
+        MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog();
+        assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size());
+
+        int fileCountConsumeQueue = getFileCountConsumeQueue();
+        MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
+        assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+        int expireFileCount = 15;
+        expireFiles(commitLogQueue, expireFileCount);
+
+        // magic code 10 reference to MappedFileQueue#DELETE_FILES_BATCH_MAX
+        for (int a = 1, fileCount = expireFileCount; a <= (int) Math.ceil((double) expireFileCount / 10); a++, fileCount -= 10) {
+            cleanCommitLogService.run();
+            cleanConsumeQueueService.run();
+
+            int expectDeletedCount = fileCount >= 10 ? a * 10 : ((a - 1) * 10 + fileCount);
+            assertEquals(fileCountCommitLog - expectDeletedCount, commitLogQueue.getMappedFiles().size());
+
+            int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
+            int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
+            assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+        }
+    }
+
+    @Test
+    public void testDeleteFilesImmediatelyBySpaceFull() throws Exception {
+        String deleteWhen = "04";
+        // the min value of diskMaxUsedSpaceRatio. make sure disk space usage is greater than 10%
+        int diskMaxUsedSpaceRatio = 1;
+        // make sure to trigger the automatic file deletion feature
+        double diskSpaceCleanForciblyRatio = 0.01D;
+        initMessageStore(deleteWhen, diskMaxUsedSpaceRatio, diskSpaceCleanForciblyRatio);
+
+        // build and put 55 messages, exactly one message per CommitLog file.
+        buildAndPutMessagesToMessageStore(msgCount);
+
+        // undo comment out the code below, if want to debug this case rather than just run it.
+        // Thread.sleep(1000 * 60 + 100);
+
+        MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog();
+        assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size());
+
+        int fileCountConsumeQueue = getFileCountConsumeQueue();
+        MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
+        assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+        // In this case, there is no need to expire the files.
+        // int expireFileCount = 15;
+        // expireFiles(commitLogQueue, expireFileCount);
+
+        // magic code 10 reference to MappedFileQueue#DELETE_FILES_BATCH_MAX
+        for (int a = 1, fileCount = fileCountCommitLog;
+             a <= (int) Math.ceil((double) fileCountCommitLog / 10) && fileCount >= 10;
+             a++, fileCount -= 10) {
+            cleanCommitLogService.run();
+            cleanConsumeQueueService.run();
+
+            assertEquals(fileCountCommitLog - 10 * a, commitLogQueue.getMappedFiles().size());
+
+            int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
+            int expectDeleteCountConsumeQueue = (int) Math.floor((double) (a * 10) / msgCountPerFile);
+            assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+        }
+    }
+
+    @Test
+    public void testDeleteExpiredFilesManually() throws Exception {
+        String deleteWhen = "04";
+        // the max value of diskMaxUsedSpaceRatio
+        int diskMaxUsedSpaceRatio = 99;
+        // used to ensure that automatic file deletion is not triggered
+        double diskSpaceCleanForciblyRatio = 0.999D;
+        initMessageStore(deleteWhen, diskMaxUsedSpaceRatio, diskSpaceCleanForciblyRatio);
+
+        messageStore.executeDeleteFilesManually();
+
+        // build and put 55 messages, exactly one message per CommitLog file.
+        buildAndPutMessagesToMessageStore(msgCount);
+
+        // undo comment out the code below, if want to debug this case rather than just run it.
+        // Thread.sleep(1000 * 60 + 100);
+
+        MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog();
+        assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size());
+
+        int fileCountConsumeQueue = getFileCountConsumeQueue();
+        MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
+        assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+        int expireFileCount = 15;
+        expireFiles(commitLogQueue, expireFileCount);
+
+        // magic code 10 reference to MappedFileQueue#DELETE_FILES_BATCH_MAX
+        for (int a = 1, fileCount = expireFileCount; a <= (int) Math.ceil((double) expireFileCount / 10); a++, fileCount -= 10) {
+            cleanCommitLogService.run();
+            cleanConsumeQueueService.run();
+
+            int expectDeletedCount = fileCount >= 10 ? a * 10 : ((a - 1) * 10 + fileCount);
+            assertEquals(fileCountCommitLog - expectDeletedCount, commitLogQueue.getMappedFiles().size());
+
+            int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
+            int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
+            assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+        }
+    }
+
+    private DefaultMessageStore.CleanCommitLogService getCleanCommitLogService(double diskSpaceCleanForciblyRatio)
+            throws Exception {
+        Field serviceField = messageStore.getClass().getDeclaredField("cleanCommitLogService");
+        serviceField.setAccessible(true);
+        DefaultMessageStore.CleanCommitLogService cleanCommitLogService =
+                (DefaultMessageStore.CleanCommitLogService) serviceField.get(messageStore);
+        serviceField.setAccessible(false);
+
+        Field warningLevelRatioField = cleanCommitLogService.getClass().getDeclaredField("diskSpaceWarningLevelRatio");
+        warningLevelRatioField.setAccessible(true);
+        warningLevelRatioField.set(cleanCommitLogService, diskSpaceCleanForciblyRatio);
+        warningLevelRatioField.setAccessible(false);
+
+        Field cleanForciblyRatioField = cleanCommitLogService.getClass().getDeclaredField("diskSpaceCleanForciblyRatio");
+        cleanForciblyRatioField.setAccessible(true);
+        cleanForciblyRatioField.set(cleanCommitLogService, diskSpaceCleanForciblyRatio);
+        cleanForciblyRatioField.setAccessible(false);
+        return cleanCommitLogService;
+    }
+
+    private DefaultMessageStore.CleanConsumeQueueService getCleanConsumeQueueService()
+            throws Exception {
+        Field serviceField = messageStore.getClass().getDeclaredField("cleanConsumeQueueService");
+        serviceField.setAccessible(true);
+        DefaultMessageStore.CleanConsumeQueueService cleanConsumeQueueService =
+                (DefaultMessageStore.CleanConsumeQueueService) serviceField.get(messageStore);
+        serviceField.setAccessible(false);
+        return cleanConsumeQueueService;
+    }
+
+    private MappedFileQueue getMappedFileQueueConsumeQueue()
+            throws Exception {
+        ConsumeQueue consumeQueue = messageStore.getConsumeQueueTable().get(topic).get(queueId);
+        Field queueField = consumeQueue.getClass().getDeclaredField("mappedFileQueue");
+        queueField.setAccessible(true);
+        MappedFileQueue fileQueue = (MappedFileQueue) queueField.get(consumeQueue);
+        queueField.setAccessible(false);
+        return fileQueue;
+    }
+
+    private MappedFileQueue getMappedFileQueueCommitLog() throws Exception {
+        CommitLog commitLog = messageStore.getCommitLog();
+        Field queueField = commitLog.getClass().getDeclaredField("mappedFileQueue");
+        queueField.setAccessible(true);
+        MappedFileQueue fileQueue = (MappedFileQueue) queueField.get(commitLog);
+        queueField.setAccessible(false);
+        return fileQueue;
+    }
+
+    private int getFileCountConsumeQueue() {
+        int countPerFile = getMsgCountPerConsumeQueueMappedFile();
+        double fileCount = (double) msgCount / countPerFile;
+        return (int) Math.ceil(fileCount);
+    }
+
+    private int getMsgCountPerConsumeQueueMappedFile() {
+        int size = messageStore.getMessageStoreConfig().getMapedFileSizeConsumeQueue();
+        return size / CQ_STORE_UNIT_SIZE;// 7 in this case
+    }
+
+    private void buildAndPutMessagesToMessageStore(int msgCount) throws Exception {
+        int msgLen = topic.getBytes(CHARSET_UTF8).length + 91;
+        int commitLogEndFileMinBlankLength = 4 + 4;
+        int singleMsgBodyLen = mappedFileSize - msgLen - commitLogEndFileMinBlankLength;
+
+        for (int i = 0; i < msgCount; i++) {
+            MessageExtBrokerInner msg = new MessageExtBrokerInner();
+            msg.setTopic(topic);
+            msg.setBody(new byte[singleMsgBodyLen]);
+            msg.setKeys(String.valueOf(System.currentTimeMillis()));
+            msg.setQueueId(queueId);
+            msg.setSysFlag(0);
+            msg.setBornTimestamp(System.currentTimeMillis());
+            msg.setStoreHost(storeHost);
+            msg.setBornHost(bornHost);
+            PutMessageResult result = messageStore.putMessage(msg);
+            assertTrue(result != null && result.isOk());
+        }
+
+        // wait for build consumer queue completion
+        Thread.sleep(100);
+    }
+
+    private void expireFiles(MappedFileQueue commitLogQueue, int expireCount) {
+        for (int i = 0; i < commitLogQueue.getMappedFiles().size(); i++) {
+            MappedFile mappedFile = commitLogQueue.getMappedFiles().get(i);
+            int reservedTime = fileReservedTime * 60 * 60 * 1000;
+            if (i < expireCount) {
+                boolean modified = mappedFile.getFile().setLastModified(System.currentTimeMillis() - reservedTime * 2);
+                assertTrue(modified);
+            }
+        }
+    }
+
+    private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest();
+        messageStoreConfig.setMapedFileSizeCommitLog(mappedFileSize);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(mappedFileSize);
+        messageStoreConfig.setMaxHashSlotNum(10000);
+        messageStoreConfig.setMaxIndexNum(100 * 100);
+        messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+        messageStoreConfig.setFlushIntervalConsumeQueue(1);
+
+        // Invalidate DefaultMessageStore`s scheduled task of cleaning expired files.
+        // work with the code 'Thread.sleep(1000 * 60 + 100)' behind.
+        messageStoreConfig.setCleanResourceInterval(Integer.MAX_VALUE);
+
+        messageStoreConfig.setFileReservedTime(fileReservedTime);
+        messageStoreConfig.setDeleteWhen(deleteWhen);
+        messageStoreConfig.setDiskMaxUsedSpaceRatio(diskMaxUsedSpaceRatio);
+
+        String storePathRootDir = System.getProperty("user.home") + File.separator
+                + "DefaultMessageStoreCleanFilesTest-" + UUID.randomUUID();
+        String storePathCommitLog = storePathRootDir + File.separator + "commitlog";
+        messageStoreConfig.setStorePathRootDir(storePathRootDir);
+        messageStoreConfig.setStorePathCommitLog(storePathCommitLog);
+
+        messageStore = new DefaultMessageStore(messageStoreConfig,
+                new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig());
+
+        cleanCommitLogService = getCleanCommitLogService(diskSpaceCleanForciblyRatio);
+        cleanConsumeQueueService = getCleanConsumeQueueService();
+
+        assertTrue(messageStore.load());
+        messageStore.start();
+    }
+
+    private class MyMessageArrivingListener implements MessageArrivingListener {
+        @Override
+        public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
+                             byte[] filterBitMap, Map<String, String> properties) {
+        }
+    }
+
+    @After
+    public void destroy() {
+        messageStore.shutdown();
+        messageStore.destroy();
+
+        MessageStoreConfig messageStoreConfig = messageStore.getMessageStoreConfig();
+        File file = new File(messageStoreConfig.getStorePathRootDir());
+        UtilAll.deleteFile(file);
+    }
+
+    private class MessageStoreConfigForTest extends MessageStoreConfig {
+        @Override
+        public int getDiskMaxUsedSpaceRatio() {
+            try {
+                Field diskMaxUsedSpaceRatioField = this.getClass().getSuperclass().getDeclaredField("diskMaxUsedSpaceRatio");
+                diskMaxUsedSpaceRatioField.setAccessible(true);
+                int ratio = (int) diskMaxUsedSpaceRatioField.get(this);
+                diskMaxUsedSpaceRatioField.setAccessible(false);
+                return ratio;
+            } catch (Exception ignored) {
+            }
+            return super.getDiskMaxUsedSpaceRatio();
+        }
+    }
+}