You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/04 03:55:26 UTC

[rocketmq] branch develop updated: [ISSUE #6665] Optimize the process of truncateInvalidMsgs() (#6666)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 709537595 [ISSUE #6665] Optimize the process of truncateInvalidMsgs() (#6666)
709537595 is described below

commit 709537595c790b855e30f5bb499eaa458cb46970
Author: Ji Juntao <ju...@alibaba-inc.com>
AuthorDate: Thu May 4 11:55:04 2023 +0800

    [ISSUE #6665] Optimize the process of truncateInvalidMsgs() (#6666)
    
    * 1. Fix the bug that confirmOffset may be larger than ReputOffset in some cases. 2. Optimize the logic when truncate dirty files.
    
    * Add more logs while expanding SyncStateSet.
    
    * reset the confirmOffset to the truncated offset.
    
    * optimize the log.
    
    * move setConfirmOffset() process into commitLog.
    
    * add the judge.
---
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java       | 3 +++
 .../main/java/org/apache/rocketmq/store/DefaultMessageStore.java   | 5 +++--
 .../apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java   | 7 ++++++-
 3 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 18cc32179..1c8cb7ab6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -706,6 +706,9 @@ public class CommitLog implements Swappable {
         }
 
         this.mappedFileQueue.truncateDirtyFiles(phyOffset);
+        if (this.confirmOffset > phyOffset) {
+            this.setConfirmOffset(phyOffset);
+        }
     }
 
     protected void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult result, MappedFile commitLogFile) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index ca8f30684..0b1f69ee7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -713,7 +713,6 @@ public class DefaultMessageStore implements MessageStore {
             this.reputMessageService = new ConcurrentReputMessageService();
         }
 
-
         long resetReputOffset = Math.min(oldReputFromOffset, offsetToTruncate);
 
         LOGGER.info("oldReputFromOffset is {}, reset reput from offset to {}", oldReputFromOffset, resetReputOffset);
@@ -3262,5 +3261,7 @@ public class DefaultMessageStore implements MessageStore {
             (this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
     }
 
-
+    public long getReputFromOffset() {
+        return this.reputMessageService.getReputFromOffset();
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 75ef622ec..3a918ee8e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -263,6 +263,7 @@ public class AutoSwitchHAService extends DefaultHAService {
         this.executorService.submit(() -> {
             syncStateSetChangedListeners.forEach(listener -> listener.accept(newSyncStateSet));
         });
+        LOGGER.info("Notify the syncStateSet has been changed into {}.", newSyncStateSet);
     }
 
     /**
@@ -312,6 +313,8 @@ public class AutoSwitchHAService extends DefaultHAService {
         if (slaveMaxOffset >= confirmOffset) {
             final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
             if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
+                LOGGER.info("The slave {} has caught up, slaveMaxOffset: {}, confirmOffset: {}, epoch: {}, leader epoch startOffset: {}.",
+                        slaveBrokerId, slaveMaxOffset, confirmOffset, currentLeaderEpoch.getEpoch(), currentLeaderEpoch.getStartOffset());
                 currentSyncStateSet.add(slaveBrokerId);
                 markSynchronizingSyncStateSet(currentSyncStateSet);
                 // Notify the upper layer that syncStateSet changed.
@@ -494,7 +497,9 @@ public class AutoSwitchHAService extends DefaultHAService {
         }
 
         boolean doNext = true;
-        long reputFromOffset = this.defaultMessageStore.getMaxPhyOffset() - dispatchBehind;
+
+        // Here we could use reputFromOffset in DefaultMessageStore directly.
+        long reputFromOffset = this.defaultMessageStore.getReputFromOffset();
         do {
             SelectMappedBufferResult result = this.defaultMessageStore.getCommitLog().getData(reputFromOffset);
             if (result == null) {