You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/12/13 11:40:44 UTC

[rocketmq] branch develop updated: [ROCKETMQ-320]Message loss when shutdown with dispatch behind (#197)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c674137  [ROCKETMQ-320]Message loss when shutdown with dispatch behind (#197)
c674137 is described below

commit c67413749d60682a3393f824487da0478311dd0a
Author: Zhendong Liu <zh...@yeah.net>
AuthorDate: Wed Dec 13 19:40:42 2017 +0800

    [ROCKETMQ-320]Message loss when shutdown with dispatch behind (#197)
---
 .../apache/rocketmq/store/DefaultMessageStore.java |  5 +-
 .../store/DefaultMessageStoreShuwDownTest.java     | 76 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 192d097..7a5647c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -110,6 +110,8 @@ public class DefaultMessageStore implements MessageStore {
 
     private FileLock lock;
 
+    boolean shutDownNormal = false;
+
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
@@ -265,8 +267,9 @@ public class DefaultMessageStore implements MessageStore {
             this.storeCheckpoint.flush();
             this.storeCheckpoint.shutdown();
 
-            if (this.runningFlags.isWriteable()) {
+            if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
                 this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+                shutDownNormal = true;
             } else {
                 log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
             }
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShuwDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShuwDownTest.java
new file mode 100644
index 0000000..ac85d59
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShuwDownTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.io.File;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMessageStoreShuwDownTest {
+    private DefaultMessageStore messageStore;
+
+    @Before
+    public void init() throws Exception {
+        messageStore = spy(buildMessageStore());
+        boolean load = messageStore.load();
+        when(messageStore.dispatchBehindBytes()).thenReturn(100L);
+        assertTrue(load);
+        messageStore.start();
+    }
+
+    @Test
+    public void testDispatchBehindWhenShutDown() {
+        messageStore.shutdown();
+        assertTrue(!messageStore.shutDownNormal);
+        File file = new File(StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()));
+        assertTrue(file.exists());
+    }
+
+    @After
+    public void destory() {
+        messageStore.destroy();
+        File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir());
+        UtilAll.deleteFile(file);
+    }
+
+    public DefaultMessageStore buildMessageStore() throws Exception {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
+        messageStoreConfig.setMaxHashSlotNum(10000);
+        messageStoreConfig.setMaxIndexNum(100 * 100);
+        messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+        return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
+    }
+
+
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@rocketmq.apache.org" <co...@rocketmq.apache.org>'].