You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/11/01 11:38:13 UTC

[GitHub] duhengforever closed pull request #501: [ISSUE #467] fix Message missed after recovering from abnormal shutdown

duhengforever closed pull request #501: [ISSUE #467] fix Message missed after recovering from abnormal shutdown
URL: https://github.com/apache/rocketmq/pull/501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/README.md b/README.md
index 9b1986277..fbbd0d718 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ It offers a variety of features:
 * Docs: <https://rocketmq.apache.org/docs/quick-start/>
 * Issues: <https://github.com/apache/rocketmq/issues>
 * Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
-* Slack: <https://rocketmq-community.slack.com/>
+* Slack: <https://rocketmq-invite-automation.herokuapp.com/>
  
 
 ----------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d3caf8d84..03b115164 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -158,7 +158,7 @@ public SelectMappedBufferResult getData(final long offset, final boolean returnF
     /**
      * When the normal exit, data recovery, all memory data have been flush
      */
-    public void recoverNormally() {
+    public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
         boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
         final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
         if (!mappedFiles.isEmpty()) {
@@ -206,6 +206,12 @@ else if (!dispatchRequest.isSuccess()) {
             this.mappedFileQueue.setFlushedWhere(processOffset);
             this.mappedFileQueue.setCommittedWhere(processOffset);
             this.mappedFileQueue.truncateDirtyFiles(processOffset);
+
+            // Clear ConsumeQueue redundant data
+            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
+                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
+                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+            }
         }
     }
 
@@ -390,7 +396,7 @@ public void setConfirmOffset(long phyOffset) {
         this.confirmOffset = phyOffset;
     }
 
-    public void recoverAbnormally() {
+    public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
         // recover by the minimum time stamp
         boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
         final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
@@ -418,41 +424,41 @@ public void recoverAbnormally() {
                 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                 int size = dispatchRequest.getMsgSize();
 
-                // Normal data
-                if (size > 0) {
-                    mappedFileOffset += size;
+                if (dispatchRequest.isSuccess()) {
+                    // Normal data
+                    if (size > 0) {
+                        mappedFileOffset += size;
 
-                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
-                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
+                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+                            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
+                                this.defaultMessageStore.doDispatch(dispatchRequest);
+                            }
+                        } else {
                             this.defaultMessageStore.doDispatch(dispatchRequest);
                         }
-                    } else {
-                        this.defaultMessageStore.doDispatch(dispatchRequest);
                     }
-                }
-                // Intermediate file read error
-                else if (size == -1) {
+                    // Come the end of the file, switch to the next file
+                    // Since the return 0 representatives met last hole, this can
+                    // not be included in truncate offset
+                    else if (size == 0) {
+                        index++;
+                        if (index >= mappedFiles.size()) {
+                            // The current branch under normal circumstances should
+                            // not happen
+                            log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
+                            break;
+                        } else {
+                            mappedFile = mappedFiles.get(index);
+                            byteBuffer = mappedFile.sliceByteBuffer();
+                            processOffset = mappedFile.getFileFromOffset();
+                            mappedFileOffset = 0;
+                            log.info("recover next physics file, " + mappedFile.getFileName());
+                        }
+                    }
+                } else {
                     log.info("recover physics file end, " + mappedFile.getFileName());
                     break;
                 }
-                // Come the end of the file, switch to the next file
-                // Since the return 0 representatives met last hole, this can
-                // not be included in truncate offset
-                else if (size == 0) {
-                    index++;
-                    if (index >= mappedFiles.size()) {
-                        // The current branch under normal circumstances should
-                        // not happen
-                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
-                        break;
-                    } else {
-                        mappedFile = mappedFiles.get(index);
-                        byteBuffer = mappedFile.sliceByteBuffer();
-                        processOffset = mappedFile.getFileFromOffset();
-                        mappedFileOffset = 0;
-                        log.info("recover next physics file, " + mappedFile.getFileName());
-                    }
-                }
             }
 
             processOffset += mappedFileOffset;
@@ -461,7 +467,10 @@ else if (size == 0) {
             this.mappedFileQueue.truncateDirtyFiles(processOffset);
 
             // Clear ConsumeQueue redundant data
-            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
+                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
+                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+            }
         }
         // Commitlog case files are deleted
         else {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 1ade7c283..cb0046eb3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1276,12 +1276,12 @@ private boolean loadConsumeQueue() {
     }
 
     private void recover(final boolean lastExitOK) {
-        this.recoverConsumeQueue();
+        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
 
         if (lastExitOK) {
-            this.commitLog.recoverNormally();
+            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
         } else {
-            this.commitLog.recoverAbnormally();
+            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
         }
 
         this.recoverTopicQueueTable();
@@ -1306,12 +1306,18 @@ private void putConsumeQueue(final String topic, final int queueId, final Consum
         }
     }
 
-    private void recoverConsumeQueue() {
+    private long recoverConsumeQueue() {
+        long maxPhysicOffset = -1;
         for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
             for (ConsumeQueue logic : maps.values()) {
                 logic.recover();
+                if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
+                    maxPhysicOffset = logic.getMaxPhysicOffset();
+                }
             }
         }
+
+        return maxPhysicOffset;
     }
 
     private void recoverTopicQueueTable() {
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 20f94f09a..57b6999c4 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -18,9 +18,12 @@
 package org.apache.rocketmq.store;
 
 import java.io.File;
+import java.io.RandomAccessFile;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -29,6 +32,7 @@
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.junit.After;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.Before;
@@ -171,6 +175,120 @@ public void testPullSize() throws Exception {
         assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
     }
 
+    @Test
+    public void testRecover() throws Exception {
+        String topic = "recoverTopic";
+        MessageBody = StoreMessage.getBytes();
+        for (int i = 0; i < 100; i++) {
+            MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+            messageExtBrokerInner.setTopic(topic);
+            messageExtBrokerInner.setQueueId(0);
+            messageStore.putMessage(messageExtBrokerInner);
+        }
+
+        Thread.sleep(100);//wait for build consumer queue
+        long maxPhyOffset = messageStore.getMaxPhyOffset();
+        long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
+
+        //1.just reboot
+        messageStore.shutdown();
+        messageStore = buildMessageStore();
+        boolean load = messageStore.load();
+        assertTrue(load);
+        messageStore.start();
+        assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset());
+        assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
+
+        //2.damage commitlog and reboot normal
+        for (int i = 0; i < 100; i++) {
+            MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+            messageExtBrokerInner.setTopic(topic);
+            messageExtBrokerInner.setQueueId(0);
+            messageStore.putMessage(messageExtBrokerInner);
+        }
+        Thread.sleep(100);
+        long secondLastPhyOffset = messageStore.getMaxPhyOffset();
+        long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
+
+        MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+        messageExtBrokerInner.setTopic(topic);
+        messageExtBrokerInner.setQueueId(0);
+        messageStore.putMessage(messageExtBrokerInner);
+
+        messageStore.shutdown();
+
+        //damage last message
+        damageCommitlog(secondLastPhyOffset);
+
+        //reboot
+        messageStore = buildMessageStore();
+        load = messageStore.load();
+        assertTrue(load);
+        messageStore.start();
+        assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
+        assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
+
+        //3.damage commitlog and reboot abnormal
+        for (int i = 0; i < 100; i++) {
+            messageExtBrokerInner = buildMessage();
+            messageExtBrokerInner.setTopic(topic);
+            messageExtBrokerInner.setQueueId(0);
+            messageStore.putMessage(messageExtBrokerInner);
+        }
+        Thread.sleep(100);
+        secondLastPhyOffset = messageStore.getMaxPhyOffset();
+        secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
+
+        messageExtBrokerInner = buildMessage();
+        messageExtBrokerInner.setTopic(topic);
+        messageExtBrokerInner.setQueueId(0);
+        messageStore.putMessage(messageExtBrokerInner);
+        messageStore.shutdown();
+
+        //damage last message
+        damageCommitlog(secondLastPhyOffset);
+        //add abort file
+        String fileName = StorePathConfigHelper.getAbortFile(((DefaultMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir());
+        File file = new File(fileName);
+        MappedFile.ensureDirOK(file.getParent());
+        file.createNewFile();
+
+        messageStore = buildMessageStore();
+        load = messageStore.load();
+        assertTrue(load);
+        messageStore.start();
+        assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
+        assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
+
+        //message write again
+        for (int i = 0; i < 100; i++) {
+            messageExtBrokerInner = buildMessage();
+            messageExtBrokerInner.setTopic(topic);
+            messageExtBrokerInner.setQueueId(0);
+            messageStore.putMessage(messageExtBrokerInner);
+        }
+    }
+
+    private void damageCommitlog(long offset) throws Exception {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
+
+        FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
+        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);
+
+        int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
+        int topicLenIndex = (int) offset + 84 + bodyLen + 4;
+        mappedByteBuffer.position(topicLenIndex);
+        mappedByteBuffer.putInt(0);
+        mappedByteBuffer.putInt(0);
+        mappedByteBuffer.putInt(0);
+        mappedByteBuffer.putInt(0);
+
+        mappedByteBuffer.force();
+        fileChannel.force(true);
+        fileChannel.close();
+    }
+
     private class MyMessageArrivingListener implements MessageArrivingListener {
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services