You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/06 13:44:38 UTC

[rocketmq] branch develop updated: [RIP-10] optimization test case of DefaultMessageStore (#995)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ede33c1  [RIP-10] optimization test case of DefaultMessageStore (#995)
ede33c1 is described below

commit ede33c1d17f7c71fa216e7dba49bbc8b301f2429
Author: hdchen <14...@qq.com>
AuthorDate: Wed Mar 6 21:44:31 2019 +0800

    [RIP-10] optimization test case of DefaultMessageStore (#995)
---
 .../rocketmq/store/DefaultMessageStoreTest.java    | 51 +++++++++++++++++-----
 1 file changed, 39 insertions(+), 12 deletions(-)

diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index ad4ca91..d0f7293 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -123,6 +124,8 @@ public class DefaultMessageStoreTest {
             messageStore.putMessage(buildMessage());
         }
 
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
+
         for (long i = 0; i < totalMsgs; i++) {
             GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
             assertThat(result).isNotNull();
@@ -180,7 +183,8 @@ public class DefaultMessageStoreTest {
         int queueId = 0;
         String topic = "FooBar";
         AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
-        Thread.sleep(10);
+        //Thread.sleep(10);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
 
         ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
         for (AppendMessageResult appendMessageResult : appendMessageResults) {
@@ -198,7 +202,8 @@ public class DefaultMessageStoreTest {
         int queueId = 0;
         String topic = "FooBar";
         AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
-        Thread.sleep(10);
+        //Thread.sleep(10);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
         int skewing = 2;
 
         ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
@@ -222,7 +227,8 @@ public class DefaultMessageStoreTest {
         int queueId = 0;
         String topic = "FooBar";
         AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
-        Thread.sleep(10);
+        //Thread.sleep(10);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
         int skewing = 20000;
 
         ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
@@ -235,6 +241,9 @@ public class DefaultMessageStoreTest {
             assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes());
             assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset());
             assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes());
+
+            indexBuffer.release();
+            indexBuffer2.release();
         }
     }
 
@@ -245,7 +254,9 @@ public class DefaultMessageStoreTest {
         int wrongQueueId = 1;
         String topic = "FooBar";
         AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
-        Thread.sleep(10);
+        //Thread.sleep(10);
+
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
 
         long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp());
 
@@ -259,7 +270,8 @@ public class DefaultMessageStoreTest {
         int wrongQueueId = 1;
         String topic = "FooBar";
         putMessages(totalCount, topic, queueId, false);
-        Thread.sleep(10);
+        //Thread.sleep(10);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
 
         long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0);
 
@@ -273,7 +285,9 @@ public class DefaultMessageStoreTest {
         int wrongQueueId = 1;
         String topic = "FooBar";
         putMessages(totalCount, topic, queueId, true);
-        Thread.sleep(10);
+        //Thread.sleep(10);
+
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
 
         long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1);
 
@@ -287,7 +301,8 @@ public class DefaultMessageStoreTest {
         int queueId = 0;
         String topic = "FooBar";
         AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
-        Thread.sleep(10);
+        //Thread.sleep(10);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
 
         ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
         int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue();
@@ -310,7 +325,8 @@ public class DefaultMessageStoreTest {
         int queueId = 0;
         String topic = "FooBar";
         AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
-        Thread.sleep(10);
+        //Thread.sleep(10);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
         ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId);
 
         for (int i = 0; i < totalCount; i++) {
@@ -412,6 +428,8 @@ public class DefaultMessageStoreTest {
             master.putMessage(buildMessage());
         }
 
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
+
         for (long i = 0; i < totalMsgs; i++) {
             GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
             assertThat(result).isNotNull();
@@ -432,16 +450,21 @@ public class DefaultMessageStoreTest {
         }
         // wait for consume queue build
         // the sleep time should be great than consume queue flush interval
-        Thread.sleep(100);
+        //Thread.sleep(100);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
         String group = "simple";
         GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
         assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
+        getMessageResult32.release();
 
         GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
         assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);
 
+        getMessageResult20.release();
         GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null);
         assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
+        getMessageResult45.release();
+
     }
 
     @Test
@@ -455,7 +478,9 @@ public class DefaultMessageStoreTest {
             messageStore.putMessage(messageExtBrokerInner);
         }
 
-        Thread.sleep(100);//wait for build consumer queue
+       // Thread.sleep(100);//wait for build consumer queue
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
+
         long maxPhyOffset = messageStore.getMaxPhyOffset();
         long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
 
@@ -475,7 +500,8 @@ public class DefaultMessageStoreTest {
             messageExtBrokerInner.setQueueId(0);
             messageStore.putMessage(messageExtBrokerInner);
         }
-        Thread.sleep(100);
+        //Thread.sleep(100);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
         long secondLastPhyOffset = messageStore.getMaxPhyOffset();
         long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
 
@@ -504,7 +530,8 @@ public class DefaultMessageStoreTest {
             messageExtBrokerInner.setQueueId(0);
             messageStore.putMessage(messageExtBrokerInner);
         }
-        Thread.sleep(100);
+        //Thread.sleep(100);
+        StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
         secondLastPhyOffset = messageStore.getMaxPhyOffset();
         secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);