You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/01 06:36:13 UTC

[rocketmq] branch develop updated: [ISSUE #6662] Optimize the process of HA's confirmOffset calculation (#6663)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7903c7041 [ISSUE #6662] Optimize the process of HA's confirmOffset calculation (#6663)
7903c7041 is described below

commit 7903c70412fc4d20baafd7b75ca93788551fb88f
Author: Ji Juntao <ju...@alibaba-inc.com>
AuthorDate: Mon May 1 14:35:52 2023 +0800

    [ISSUE #6662] Optimize the process of HA's confirmOffset calculation (#6663)
    
    * When compute confirmOffset, judge whether the slaves in syncStateSet all connect to the master.
    
    * fix the brokerId to brokerControllerId.
    
    * optimize the code: 1. Set brokerId when the replicasManager inited. 2. Rename the confusing name.
    
    * set haService's brokerControllerId after registered.
    
    * set the haService's brokerControllerId when the ReplicasManager start without brokerIdentity file.
---
 .../broker/controller/ReplicasManager.java         |  2 ++
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 29 +++++++++++++++++++---
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index c39e33ad1..3c7e061a2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -549,6 +549,7 @@ public class ReplicasManager {
             this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
             this.tempBrokerMetadata.clear();
             this.brokerControllerId = this.brokerMetadata.getBrokerId();
+            this.haService.setBrokerControllerId(this.brokerControllerId);
             return true;
         } catch (Exception e) {
             LOGGER.error("fail to create metadata file", e);
@@ -600,6 +601,7 @@ public class ReplicasManager {
         if (this.brokerMetadata.isLoaded()) {
             this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
             this.brokerControllerId = brokerMetadata.getBrokerId();
+            this.haService.setBrokerControllerId(this.brokerControllerId);
             return;
         }
         // 2. check if temp metadata exist
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 80249bc32..d1e623ca7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -50,6 +51,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * SwitchAble ha service, support switch role to master or slave.
@@ -72,6 +74,8 @@ public class AutoSwitchHAService extends DefaultHAService {
     private EpochFileCache epochCache;
     private AutoSwitchHAClient haClient;
 
+    private Long brokerControllerId = null;
+
     public AutoSwitchHAService() {
     }
 
@@ -427,14 +431,25 @@ public class AutoSwitchHAService extends DefaultHAService {
 
     private long computeConfirmOffset() {
         final Set<Long> currentSyncStateSet = getSyncStateSet();
-        long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+        long newConfirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+        List<Long> idList = this.connectionList.stream().map(connection -> ((AutoSwitchHAConnection)connection).getSlaveId()).collect(Collectors.toList());
+
+        // To avoid the syncStateSet is not consistent with connectionList.
+        // Fix issue: https://github.com/apache/rocketmq/issues/6662
+        for (Long syncId : currentSyncStateSet) {
+            if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) {
+                LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId);
+                return this.confirmOffset;
+            }
+        }
+
         for (HAConnection connection : this.connectionList) {
             final Long slaveId = ((AutoSwitchHAConnection) connection).getSlaveId();
             if (currentSyncStateSet.contains(slaveId)) {
-                confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
+                newConfirmOffset = Math.min(newConfirmOffset, connection.getSlaveAckOffset());
             }
         }
-        return confirmOffset;
+        return newConfirmOffset;
     }
 
     public void setSyncStateSet(final Set<Long> syncStateSet) {
@@ -545,6 +560,14 @@ public class AutoSwitchHAService extends DefaultHAService {
         return this.epochCache.getAllEntries();
     }
 
+    public Long getBrokerControllerId() {
+        return brokerControllerId;
+    }
+
+    public void setBrokerControllerId(Long brokerControllerId) {
+        this.brokerControllerId = brokerControllerId;
+    }
+
     class AutoSwitchAcceptSocketService extends AcceptSocketService {
 
         public AutoSwitchAcceptSocketService(final MessageStoreConfig messageStoreConfig) {