You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/06 13:44:38 UTC
[rocketmq] branch develop updated: [RIP-10] optimization test case
of DefaultMessageStore (#995)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ede33c1 [RIP-10] optimization test case of DefaultMessageStore (#995)
ede33c1 is described below
commit ede33c1d17f7c71fa216e7dba49bbc8b301f2429
Author: hdchen <14...@qq.com>
AuthorDate: Wed Mar 6 21:44:31 2019 +0800
[RIP-10] optimization test case of DefaultMessageStore (#995)
---
.../rocketmq/store/DefaultMessageStoreTest.java | 51 +++++++++++++++++-----
1 file changed, 39 insertions(+), 12 deletions(-)
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index ad4ca91..d0f7293 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@@ -123,6 +124,8 @@ public class DefaultMessageStoreTest {
messageStore.putMessage(buildMessage());
}
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
+
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
@@ -180,7 +183,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
- Thread.sleep(10);
+ //Thread.sleep(10);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
for (AppendMessageResult appendMessageResult : appendMessageResults) {
@@ -198,7 +202,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
- Thread.sleep(10);
+ //Thread.sleep(10);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
int skewing = 2;
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
@@ -222,7 +227,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
- Thread.sleep(10);
+ //Thread.sleep(10);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
int skewing = 20000;
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
@@ -235,6 +241,9 @@ public class DefaultMessageStoreTest {
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes());
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset());
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes());
+
+ indexBuffer.release();
+ indexBuffer2.release();
}
}
@@ -245,7 +254,9 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
- Thread.sleep(10);
+ //Thread.sleep(10);
+
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp());
@@ -259,7 +270,8 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1;
String topic = "FooBar";
putMessages(totalCount, topic, queueId, false);
- Thread.sleep(10);
+ //Thread.sleep(10);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0);
@@ -273,7 +285,9 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1;
String topic = "FooBar";
putMessages(totalCount, topic, queueId, true);
- Thread.sleep(10);
+ //Thread.sleep(10);
+
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1);
@@ -287,7 +301,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
- Thread.sleep(10);
+ //Thread.sleep(10);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue();
@@ -310,7 +325,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
- Thread.sleep(10);
+ //Thread.sleep(10);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId);
for (int i = 0; i < totalCount; i++) {
@@ -412,6 +428,8 @@ public class DefaultMessageStoreTest {
master.putMessage(buildMessage());
}
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
+
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
@@ -432,16 +450,21 @@ public class DefaultMessageStoreTest {
}
// wait for consume queue build
// the sleep time should be great than consume queue flush interval
- Thread.sleep(100);
+ //Thread.sleep(100);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
String group = "simple";
GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
+ getMessageResult32.release();
GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);
+ getMessageResult20.release();
GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null);
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
+ getMessageResult45.release();
+
}
@Test
@@ -455,7 +478,9 @@ public class DefaultMessageStoreTest {
messageStore.putMessage(messageExtBrokerInner);
}
- Thread.sleep(100);//wait for build consumer queue
+ // Thread.sleep(100);//wait for build consumer queue
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
+
long maxPhyOffset = messageStore.getMaxPhyOffset();
long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
@@ -475,7 +500,8 @@ public class DefaultMessageStoreTest {
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
- Thread.sleep(100);
+ //Thread.sleep(100);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long secondLastPhyOffset = messageStore.getMaxPhyOffset();
long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
@@ -504,7 +530,8 @@ public class DefaultMessageStoreTest {
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
- Thread.sleep(100);
+ //Thread.sleep(100);
+ StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
secondLastPhyOffset = messageStore.getMaxPhyOffset();
secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);