You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/12 08:01:27 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Let broker register to controller again when master not existed. (#4450)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 5eaeeabf5 [Summer of code] Let broker register to controller again when master not existed. (#4450)
5eaeeabf5 is described below

commit 5eaeeabf51618d7f31e19816017eaafa2f287a2f
Author: hzh0425 <64...@qq.com>
AuthorDate: Sun Jun 12 16:01:08 2022 +0800

    [Summer of code] Let broker register to controller again when master not existed. (#4450)
    
    * register to controller if master is null
    
    * stop electmaster when the old master is active
    
    * review
    
    * fix test bugs for brokerAlivePredicate
---
 .../broker/hacontroller/ReplicasManager.java       | 25 +++++++++++++---------
 .../controller/impl/DLedgerController.java         |  8 +++++--
 .../impl/manager/ReplicasInfoManager.java          |  9 ++++++++
 .../controller/impl/DLedgerControllerTest.java     | 14 ++++++++++++
 .../impl/manager/ReplicasInfoManagerTest.java      |  4 ++--
 5 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 88a93abd9..65b0d3fc3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -300,18 +300,23 @@ public class ReplicasManager {
                 final long brokerId = info.getBrokerId();
                 synchronized (this) {
                     // Check if master changed
-                    if (StringUtils.isNoneEmpty(newMasterAddress) && newMasterEpoch > this.masterEpoch) {
-                        if (StringUtils.equals(newMasterAddress, this.localAddress)) {
-                            changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch());
-                        } else {
-                            if (brokerId > 0) {
-                                changeToSlave(newMasterAddress, newMasterEpoch, brokerId);
-                            } else if (brokerId < 0) {
-                                // If the brokerId is no existed, we should try register again.
-                                registerBrokerToController();
+                    if (newMasterEpoch > this.masterEpoch) {
+                        if (StringUtils.isNoneEmpty(newMasterAddress)) {
+                            if (StringUtils.equals(newMasterAddress, this.localAddress)) {
+                                changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch());
+                            } else {
+                                if (brokerId > 0) {
+                                    changeToSlave(newMasterAddress, newMasterEpoch, brokerId);
+                                } else if (brokerId < 0) {
+                                    // If the brokerId is no existed, we should try register again.
+                                    registerBrokerToController();
+                                }
                             }
+                        } else {
+                            // In this case, the master in controller is null, try register to controller again, this will trigger the electMasterEvent in controller.
+                            registerBrokerToController();
                         }
-                    } else {
+                    } else if (newMasterEpoch == this.masterEpoch) {
                         // Check if sync state set changed
                         if (isMasterState()) {
                             changeSyncStateSet(syncStateSet.getSyncStateSet(), syncStateSet.getSyncStateSetEpoch());
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index e99036817..67f293ad1 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -69,13 +69,13 @@ public class DLedgerController implements Controller {
     private final DLedgerServer dLedgerServer;
     private final ControllerConfig controllerConfig;
     private final DLedgerConfig dLedgerConfig;
-    // Usr for checking whether the broker is alive
-    private final BiPredicate<String, String> brokerAlivePredicate;
     private final ReplicasInfoManager replicasInfoManager;
     private final EventScheduler scheduler;
     private final EventSerializer eventSerializer;
     private final RoleChangeHandler roleHandler;
     private final DLedgerControllerStateMachine statemachine;
+    // Usr for checking whether the broker is alive
+    private BiPredicate<String, String> brokerAlivePredicate;
     private volatile boolean isScheduling = false;
 
     public DLedgerController(final ControllerConfig config, final BiPredicate<String, String> brokerAlivePredicate) {
@@ -217,6 +217,10 @@ public class DLedgerController implements Controller {
         return this.dLedgerServer.getMemberState();
     }
 
+    public void setBrokerAlivePredicate(BiPredicate<String, String> brokerAlivePredicate) {
+        this.brokerAlivePredicate = brokerAlivePredicate;
+    }
+
     /**
      * Event handler that handle event
      */
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 2590bd205..ba350e5a3 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -157,6 +157,15 @@ public class ReplicasInfoManager {
             final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
             final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
             final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
+            // First, check whether the master is still active
+            final String oldMaster = syncStateInfo.getMasterAddress();
+            if (StringUtils.isNoneEmpty(oldMaster) && brokerAlivePredicate.test(brokerInfo.getClusterName(), oldMaster)) {
+                String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerInfo.getBrokerName());
+                log.warn("{}", err);
+                result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+                return result;
+            }
+
             // Try elect a master in syncStateSet
             if (syncStateSet.size() > 1) {
                 boolean electSuccess = tryElectMaster(result, brokerName, syncStateSet, (candidate) ->
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 56730f40d..d4ef96024 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -164,10 +164,22 @@ public class DLedgerControllerTest {
         return leader;
     }
 
+    public void setBrokerAlivePredicate(DLedgerController controller, String... deathBroker) {
+        controller.setBrokerAlivePredicate((clusterName, brokerAddress) -> {
+            for (String broker : deathBroker) {
+                if (broker.equals(brokerAddress)) {
+                    return false;
+                }
+            }
+            return true;
+        });
+    }
+
     @Test
     public void testElectMaster() throws Exception {
         final DLedgerController leader = mockMetaData(false);
         final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        setBrokerAlivePredicate(leader, "127.0.0.1:9000");
         final RemotingCommand resp = leader.electMaster(request).get(10, TimeUnit.SECONDS);
         final ElectMasterResponseHeader response = (ElectMasterResponseHeader) resp.readCustomHeader();
         assertEquals(response.getMasterEpoch(), 2);
@@ -186,6 +198,7 @@ public class DLedgerControllerTest {
         // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
         // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
         final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+        setBrokerAlivePredicate(leader, "127.0.0.1:9000");
         leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
 
         final RemotingCommand resp = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).
@@ -225,6 +238,7 @@ public class DLedgerControllerTest {
         // However, event if the syncStateSet in statemachine is {"127.0.0.1:9000"}
         // the option {enableElectUncleanMaster = true}, so the controller sill can elect a new master
         final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+        setBrokerAlivePredicate(leader, "127.0.0.1:9000");
         final CompletableFuture<RemotingCommand> future = leader.electMaster(electRequest);
         future.get(10, TimeUnit.SECONDS);
 
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 8bf594076..4c47b5a77 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -108,7 +108,7 @@ public class ReplicasInfoManagerTest {
     public void testElectMaster() {
         mockMetaData();
         final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
-        final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, (va1, va2) -> true);
+        final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, (clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"));
         final ElectMasterResponseHeader response = cResult.getResponse();
         assertEquals(response.getMasterEpoch(), 2);
         assertFalse(response.getNewMasterAddress().isEmpty());
@@ -125,7 +125,7 @@ public class ReplicasInfoManagerTest {
         // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
         // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
         final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
-        final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest, (va1, va2) -> true);
+        final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest, (clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"));
         final List<EventMessage> events = cResult.getEvents();
         assertEquals(events.size(), 1);
         final ElectMasterEvent event = (ElectMasterEvent) events.get(0);