You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/04 14:42:07 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: Polish switching logic and auto switch ha code (#4406)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new f27800a13 Polish switching logic and auto switch ha code (#4406)
f27800a13 is described below

commit f27800a134d1a595fcf9654319ea6db8a9cf1713
Author: rongtong <ji...@163.com>
AuthorDate: Sat Jun 4 22:42:01 2022 +0800

    Polish switching logic and auto switch ha code (#4406)
    
    * Polish switching logic and auto switch ha code
    
    * Make UT can pass
    
    * Polish the code
---
 .../rocketmq/broker/hacontroller/ReplicasManager.java     | 13 +++++++++----
 .../controller/impl/manager/ReplicasInfoManager.java      | 15 +++++++--------
 .../main/java/org/apache/rocketmq/store/CommitLog.java    |  2 ++
 .../store/ha/autoswitch/AutoSwitchHAConnection.java       |  4 +++-
 .../rocketmq/store/ha/autoswitch/AutoSwitchHAService.java |  4 +---
 .../rocketmq/store/ha/autoswitch/AutoSwitchHATest.java    |  4 ++++
 6 files changed, 26 insertions(+), 16 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 48f61f5e4..94bfa5d63 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -169,6 +169,9 @@ public class ReplicasManager {
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
 
+                // Notify ha service, change to master
+                this.haService.changeToMaster(newMasterEpoch);
+
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
                     try {
@@ -177,8 +180,6 @@ public class ReplicasManager {
                         LOGGER.error("Error happen when register broker to name-srv, Failed to change broker to master", e);
                         return;
                     }
-                    // Notify ha service, change to master
-                    this.haService.changeToMaster(newMasterEpoch);
                     LOGGER.info("Change broker {} to master success, masterEpoch {}, syncStateSetEpoch:{}", this.localAddress, newMasterEpoch, syncStateSetEpoch);
                 });
             }
@@ -193,6 +194,7 @@ public class ReplicasManager {
                 // Change record
                 this.masterAddress = newMasterAddress;
                 this.masterEpoch = newMasterEpoch;
+
                 stopCheckSyncStateSet();
 
                 // Change config
@@ -203,6 +205,9 @@ public class ReplicasManager {
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SLAVE);
 
+                // Notify ha service, change to slave
+                this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
+
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
                     try {
@@ -212,8 +217,6 @@ public class ReplicasManager {
                         return;
                     }
 
-                    // Notify ha service, change to slave
-                    this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
                     LOGGER.info("Change broker {} to slave, newMasterAddress:{}, newMasterEpoch:{}", this.localAddress, newMasterAddress, newMasterEpoch);
                 });
             }
@@ -227,6 +230,8 @@ public class ReplicasManager {
                 this.syncStateSetEpoch = newSyncStateSetEpoch;
                 this.syncStateSet = new HashSet<>(newSyncStateSet);
                 this.haService.setSyncStateSet(newSyncStateSet);
+            } else {
+                LOGGER.info("Sync state set changed failed, newSyncStateSetEpoch is {} and syncStateSetEpoch is {}", newSyncStateSetEpoch, this.syncStateSetEpoch);
             }
         }
     }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 5e718f9a9..e4b042dae 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -49,10 +49,9 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 /**
- * The manager that manages the replicas info for all brokers.
- * We can think of this class as the controller's memory state machine
- * It should be noted that this class is not thread safe,
- * and the upper layer needs to ensure that it can be called sequentially
+ * The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory
+ * state machine It should be noted that this class is not thread safe, and the upper layer needs to ensure that it can
+ * be called sequentially
  */
 public class ReplicasInfoManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
@@ -131,7 +130,7 @@ public class ReplicasInfoManager {
             // Generate event
             int epoch = syncStateInfo.getSyncStateSetEpoch() + 1;
             response.setNewSyncStateSetEpoch(epoch);
-            result.setBody(new org.apache.rocketmq.common.protocol.body.SyncStateSet(newSyncStateSet, epoch).encode());
+            result.setBody(new SyncStateSet(newSyncStateSet, epoch).encode());
             final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
             result.addEvent(event);
             return result;
@@ -273,7 +272,7 @@ public class ReplicasInfoManager {
             if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
                 response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress()));
             }
-            result.setBody(new org.apache.rocketmq.common.protocol.body.SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
+            result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
             return result;
         }
         result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, "Broker metadata is not existed");
@@ -369,8 +368,8 @@ public class ReplicasInfoManager {
             final String clusterName = event.getClusterName();
             final BrokerInfo brokerInfo = new BrokerInfo(clusterName, brokerName);
             brokerInfo.addBroker(newMaster, 1L);
-            final SyncStateInfo replicasInfo = new SyncStateInfo(clusterName, brokerName, newMaster);
-            this.syncStateSetInfoTable.put(brokerName, replicasInfo);
+            final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName, newMaster);
+            this.syncStateSetInfoTable.put(brokerName, syncStateInfo);
             this.replicaInfoTable.put(brokerName, brokerInfo);
         }
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 5a96b5345..98b4fc937 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -352,6 +352,8 @@ public class CommitLog implements Swappable {
                 log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                 this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
             }
+
+
         } else {
             // Commitlog case files are deleted
             log.warn("The commitlog files are deleted, and delete the consume queue files");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index d8a26e20e..dfd3215d2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -328,7 +328,9 @@ public class AutoSwitchHAConnection implements HAConnection {
                                     slaveRequestOffset = slaveMaxOffset;
                                 }
                                 byteBufferRead.position(readSocketPos);
-                                maybeExpandInSyncStateSet(slaveMaxOffset);
+                                if (!haService.getSyncStateSet().contains(slaveAddress)) {
+                                    maybeExpandInSyncStateSet(slaveMaxOffset);
+                                }
                                 AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
                                 LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
                                 break;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 84b047e2d..38758d663 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -100,9 +100,6 @@ public class AutoSwitchHAService extends DefaultHAService {
 
         this.defaultMessageStore.recoverTopicQueueTable();
 
-        final HashSet<String> newSyncStateSet = new HashSet<>();
-        newSyncStateSet.add(this.localAddress);
-        setSyncStateSet(newSyncStateSet);
         LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
         return true;
     }
@@ -122,6 +119,7 @@ public class AutoSwitchHAService extends DefaultHAService {
             this.haClient.setLocalAddress(this.localAddress);
             this.haClient.updateSlaveId(slaveId);
             this.haClient.updateMasterAddress(newMasterAddr);
+            this.haClient.updateHaMasterAddress(null);
             this.haClient.start();
             LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
             return true;
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 7070db627..dfbc35f81 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -189,6 +191,7 @@ public class AutoSwitchHATest {
     public void testAsyncLearnerBrokerRole() throws Exception {
         init(defaultMappedFileSize);
         ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
         ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
 
         storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
@@ -217,6 +220,7 @@ public class AutoSwitchHATest {
         init(defaultMappedFileSize, true);
         AtomicReference<Set<String>> syncStateSet = new AtomicReference<>();
         ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
         ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
         ((AutoSwitchHAService) this.messageStore1.getHaService()).registerSyncStateSetChangedListener((newSyncStateSet) -> {
             System.out.println("Get newSyncStateSet:" + newSyncStateSet);