You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/22 02:33:51 UTC

[rocketmq] branch develop updated: [RIP-10]Add test case for CommitLog.handleHA (#829)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6bece12  [RIP-10]Add  test case for CommitLog.handleHA (#829)
6bece12 is described below

commit 6bece126da4afdb602fc4c3a0050033262321418
Author: a51764579 <a5...@qq.com>
AuthorDate: Fri Feb 22 10:33:46 2019 +0800

    [RIP-10]Add  test case for CommitLog.handleHA (#829)
    
    * Add test case for CommitLog.handleHA
    
    * Add one UUID parent path for RocketMQ's files
---
 .../apache/rocketmq/store/ConsumeQueueTest.java    |   1 +
 .../java/org/apache/rocketmq/store/HATest.java     | 150 +++++++++++++++++++++
 2 files changed, 151 insertions(+)

diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index 7e01b85..59bd904 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -212,6 +212,7 @@ public class ConsumeQueueTest {
         try {
             try {
                 putMsg(master);
+                Thread.sleep(3000L);//wait ConsumeQueue create success.
             } catch (Exception e) {
                 e.printStackTrace();
                 assertThat(Boolean.FALSE).isTrue();
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java
new file mode 100644
index 0000000..0a166d9
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.HAService;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * HATest
+ *
+ */
+public class HATest {
+    private final String StoreMessage = "Once, there was a chance for me!";
+    private int QUEUE_TOTAL = 100;
+    private AtomicInteger QueueId = new AtomicInteger(0);
+    private SocketAddress BornHost;
+    private SocketAddress StoreHost;
+    private byte[] MessageBody;
+
+    private MessageStore messageStore;
+    private MessageStore slaveMessageStore;
+    private MessageStoreConfig masterMessageStoreConfig;
+    private MessageStoreConfig slaveStoreConfig;
+    private BrokerStatsManager brokerStatsManager = new BrokerStatsManager("simpleTest");
+    private String storePathRootParentDir = System.getProperty("user.home") + File.separator +
+            UUID.randomUUID().toString().replace("-", "");
+    private String storePathRootDir = storePathRootParentDir + File.separator + "store";
+    @Before
+    public void init() throws Exception {
+        StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+        masterMessageStoreConfig = new MessageStoreConfig();
+        masterMessageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
+        masterMessageStoreConfig.setStorePathRootDir(storePathRootDir+File.separator+"master");
+        masterMessageStoreConfig.setStorePathCommitLog(storePathRootDir+File.separator+"master"+ File.separator+"commitlog");
+        buildMessageStoreConfig(masterMessageStoreConfig);
+        slaveStoreConfig = new MessageStoreConfig();
+        slaveStoreConfig.setBrokerRole(BrokerRole.SLAVE);
+        slaveStoreConfig.setStorePathRootDir(storePathRootDir+File.separator+"slave");
+        slaveStoreConfig.setStorePathCommitLog(storePathRootDir+File.separator+"slave"+ File.separator+"commitlog");
+        slaveStoreConfig.setHaListenPort(10943);
+        buildMessageStoreConfig(slaveStoreConfig);
+        messageStore = buildMessageStore(masterMessageStoreConfig,0L);
+        slaveMessageStore = buildMessageStore(slaveStoreConfig,1L);
+        boolean load = messageStore.load();
+        boolean slaveLoad = slaveMessageStore.load();
+        slaveMessageStore.updateHaMasterAddress("127.0.0.1:10912");
+        assertTrue(load);
+        assertTrue(slaveLoad);
+        messageStore.start();
+        slaveMessageStore.start();
+        Thread.sleep(6000L);//because the haClient will wait 5s after the first connectMaster failed,sleep 6s
+    }
+
+    @Test
+    public void testHandleHA() throws Exception{
+        long totalMsgs = 10;
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+        for (long i = 0; i < totalMsgs; i++) {
+            messageStore.putMessage(buildMessage());
+        }
+
+        Thread.sleep(1000L);//sleep 1000 ms
+        for (long i = 0; i < totalMsgs; i++) {
+            GetMessageResult result = slaveMessageStore.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null);
+            assertThat(result).isNotNull();
+            assertTrue(GetMessageStatus.FOUND.equals(result.getStatus()));
+            result.release();
+        }
+    }
+
+    @After
+    public void destroy() throws Exception{
+        Thread.sleep(5000L);
+        slaveMessageStore.shutdown();
+        slaveMessageStore.destroy();
+        messageStore.shutdown();
+        messageStore.destroy();
+        File file = new File(storePathRootParentDir);
+        UtilAll.deleteFile(file);
+    }
+
+    private MessageStore buildMessageStore(MessageStoreConfig messageStoreConfig,long brokerId) throws Exception {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setBrokerId(brokerId);
+        return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig);
+    }
+
+    private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig){
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
+        messageStoreConfig.setMaxHashSlotNum(10000);
+        messageStoreConfig.setMaxIndexNum(100 * 100);
+        messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+        messageStoreConfig.setFlushIntervalConsumeQueue(1);
+    }
+
+    private MessageExtBrokerInner buildMessage() {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic("FooBar");
+        msg.setTags("TAG1");
+        msg.setBody(MessageBody);
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+        msg.setSysFlag(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(StoreHost);
+        msg.setBornHost(BornHost);
+        return msg;
+    }
+
+}