You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/11 03:30:23 UTC

[rocketmq] branch develop updated: [ISSUE #6728] Compute the confirmOffset without considering new connections (#6729)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5dc2e20ef [ISSUE #6728] Compute the confirmOffset without considering new connections (#6729)
5dc2e20ef is described below

commit 5dc2e20efc9866f0f9d4f242d41ad4b4cfc65644
Author: Ji Juntao <ju...@alibaba-inc.com>
AuthorDate: Thu May 11 11:30:16 2023 +0800

    [ISSUE #6728] Compute the confirmOffset without considering new connections (#6729)
    
    * 1. When compute the confirmOffset, dismiss the ackOffset of new connections. 2. When compute the confirmOffset, use getConfirmOffsetDirectly() to avoid the endless calling.
    
    * use the calculated slaveAckOffset
    
    * optimize the logic.
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 22 +++++++++++++++++++++-
 .../apache/rocketmq/store/DefaultMessageStore.java |  8 ++++++++
 .../store/ha/autoswitch/AutoSwitchHAService.java   |  5 +++--
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index ed5e320be..56f19529d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -553,15 +553,35 @@ public class CommitLog implements Swappable {
         }
     }
 
+    // Fetch and compute the newest confirmOffset.
+    // Even if it is just inited.
     public long getConfirmOffset() {
         if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
             if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
                 if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
                     return this.defaultMessageStore.getMaxPhyOffset();
                 }
-                // First time compute confirmOffset.
+                // First time it will compute the confirmOffset.
                 if (this.confirmOffset <= 0) {
                     setConfirmOffset(((AutoSwitchHAService) this.defaultMessageStore.getHaService()).computeConfirmOffset());
+                    log.info("Init the confirmOffset to {}.", this.confirmOffset);
+                }
+            }
+            return this.confirmOffset;
+        } else if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+            return this.confirmOffset;
+        } else {
+            return getMaxOffset();
+        }
+    }
+
+    // Fetch the original confirmOffset's value.
+    // Without checking and re-computing.
+    public long getConfirmOffsetDirectly() {
+        if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+            if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
+                if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
+                    return this.defaultMessageStore.getMaxPhyOffset();
                 }
             }
             return this.confirmOffset;
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 0b1f69ee7..6b0516b04 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1589,11 +1589,19 @@ public class DefaultMessageStore implements MessageStore {
         }
     }
 
+    // Fetch and compute the newest confirmOffset.
+    // Even if it is just inited.
     @Override
     public long getConfirmOffset() {
         return this.commitLog.getConfirmOffset();
     }
 
+    // Fetch the original confirmOffset's value.
+    // Without checking and re-computing.
+    public long getConfirmOffsetDirectly() {
+        return this.commitLog.getConfirmOffsetDirectly();
+    }
+
     @Override
     public void setConfirmOffset(long phyOffset) {
         this.commitLog.setConfirmOffset(phyOffset);
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 3a918ee8e..6dc734e0c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -421,13 +421,14 @@ public class AutoSwitchHAService extends DefaultHAService {
         for (Long syncId : currentSyncStateSet) {
             if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) {
                 LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId);
-                return this.defaultMessageStore.getConfirmOffset();
+                // Without check and re-compute, return the confirmOffset's value directly.
+                return this.defaultMessageStore.getConfirmOffsetDirectly();
             }
         }
 
         for (HAConnection connection : this.connectionList) {
             final Long slaveId = ((AutoSwitchHAConnection) connection).getSlaveId();
-            if (currentSyncStateSet.contains(slaveId)) {
+            if (currentSyncStateSet.contains(slaveId) && connection.getSlaveAckOffset() > 0) {
                 newConfirmOffset = Math.min(newConfirmOffset, connection.getSlaveAckOffset());
             }
         }