You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/20 08:21:21 UTC
[rocketmq] branch develop updated: [RIP-10] Add test cases for
DefaultMessageStore.putMessages (#777)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 95db64d [RIP-10] Add test cases for DefaultMessageStore.putMessages (#777)
95db64d is described below
commit 95db64dcb47abba8d4dc68e06b0ceca3fe426213
Author: zhoubo <zh...@outlook.com>
AuthorDate: Wed Feb 20 16:21:15 2019 +0800
[RIP-10] Add test cases for DefaultMessageStore.putMessages (#777)
[RIP-10]Add test cases for DefaultMessageStore.putMessages(MessageExtBatch)
---
.../apache/rocketmq/store/BatchPutMessageTest.java | 170 +++++++++++++++++++++
1 file changed, 170 insertions(+)
diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
new file mode 100644
index 0000000..8de0a2c
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class BatchPutMessageTest {
+
+ private MessageStore messageStore;
+
+ public static final char NAME_VALUE_SEPARATOR = 1;
+ public static final char PROPERTY_SEPARATOR = 2;
+ public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+
+ @Before
+ public void init() throws Exception {
+ messageStore = buildMessageStore();
+ boolean load = messageStore.load();
+ assertTrue(load);
+ messageStore.start();
+ }
+
+ @After
+ public void destory() {
+ messageStore.shutdown();
+ messageStore.destroy();
+
+ UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "putmessagesteststore"));
+ }
+
+ private MessageStore buildMessageStore() throws Exception {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
+ messageStoreConfig.setMaxHashSlotNum(100);
+ messageStoreConfig.setMaxIndexNum(100 * 10);
+ messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+ messageStoreConfig.setFlushIntervalConsumeQueue(1);
+ messageStoreConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "putmessagesteststore");
+ messageStoreConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "putmessagesteststore" + File.separator + "commitlog");
+ return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
+ }
+
+ @Test
+ public void testPutMessages() {
+ List<Message> messages = new ArrayList<>();
+ String topic = "batch-write-topic";
+ int queue = 0;
+ int[] msgLengthArr = new int[11];
+ msgLengthArr[0] = 0;
+ int j = 1;
+ for (int i = 0; i < 10; i++) {
+ Message msg = new Message();
+ msg.setBody(("body" + i).getBytes());
+ msg.setTopic(topic);
+ msg.setTags("TAG1");
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ messages.add(msg);
+ String properties = messageProperties2String(msg.getProperties());
+ byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
+ short propertiesLength = (short) propertiesBytes.length;
+ final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+ final int topicLength = topicData.length;
+ msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength) + msgLengthArr[j - 1];
+ j++;
+ }
+ byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
+ MessageExtBatch messageExtBatch = new MessageExtBatch();
+ messageExtBatch.setTopic(topic);
+ messageExtBatch.setQueueId(queue);
+ messageExtBatch.setBody(batchMessageBody);
+ messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+ messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 125));
+ messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 126));
+
+ PutMessageResult putMessageResult = messageStore.putMessages(messageExtBatch);
+ assertThat(putMessageResult.isOk()).isTrue();
+
+ for (long i = 0; i < 10; i++) {
+ MessageExt messageExt = messageStore.lookMessageByOffset(msgLengthArr[(int) i]);
+ assertThat(messageExt).isNotNull();
+ GetMessageResult result = messageStore.getMessage("batch_write_group", topic, queue, i, 1024 * 1024, null);
+ assertThat(result).isNotNull();
+ assertThat(result.getStatus()).isEqualTo(GetMessageStatus.FOUND);
+ result.release();
+ }
+
+ }
+
+ private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
+ final int msgLen = 4 //TOTALSIZE
+ + 4 //MAGICCODE
+ + 4 //BODYCRC
+ + 4 //QUEUEID
+ + 4 //FLAG
+ + 8 //QUEUEOFFSET
+ + 8 //PHYSICALOFFSET
+ + 4 //SYSFLAG
+ + 8 //BORNTIMESTAMP
+ + 8 //BORNHOST
+ + 8 //STORETIMESTAMP
+ + 8 //STOREHOSTADDRESS
+ + 4 //RECONSUMETIMES
+ + 8 //Prepared Transaction Offset
+ + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
+ + 1 + topicLength //TOPIC
+ + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
+ + 0;
+ return msgLen;
+ }
+
+ public String messageProperties2String(Map<String, String> properties) {
+ StringBuilder sb = new StringBuilder();
+ if (properties != null) {
+ for (final Map.Entry<String, String> entry : properties.entrySet()) {
+ final String name = entry.getKey();
+ final String value = entry.getValue();
+
+ sb.append(name);
+ sb.append(NAME_VALUE_SEPARATOR);
+ sb.append(value);
+ sb.append(PROPERTY_SEPARATOR);
+ }
+ }
+ return sb.toString();
+ }
+
+ private class MyMessageArrivingListener implements MessageArrivingListener {
+ @Override
+ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
+ byte[] filterBitMap, Map<String, String> properties) {
+ }
+ }
+}