You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/12 08:01:27 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Let broker register to controller again when master not existed. (#4450)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 5eaeeabf5 [Summer of code] Let broker register to controller again when master not existed. (#4450)
5eaeeabf5 is described below
commit 5eaeeabf51618d7f31e19816017eaafa2f287a2f
Author: hzh0425 <64...@qq.com>
AuthorDate: Sun Jun 12 16:01:08 2022 +0800
[Summer of code] Let broker register to controller again when master not existed. (#4450)
* register to controller if master is null
* stop electmaster when the old master is active
* review
* fix test bugs for brokerAlivePredicate
---
.../broker/hacontroller/ReplicasManager.java | 25 +++++++++++++---------
.../controller/impl/DLedgerController.java | 8 +++++--
.../impl/manager/ReplicasInfoManager.java | 9 ++++++++
.../controller/impl/DLedgerControllerTest.java | 14 ++++++++++++
.../impl/manager/ReplicasInfoManagerTest.java | 4 ++--
5 files changed, 46 insertions(+), 14 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 88a93abd9..65b0d3fc3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -300,18 +300,23 @@ public class ReplicasManager {
final long brokerId = info.getBrokerId();
synchronized (this) {
// Check if master changed
- if (StringUtils.isNoneEmpty(newMasterAddress) && newMasterEpoch > this.masterEpoch) {
- if (StringUtils.equals(newMasterAddress, this.localAddress)) {
- changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch());
- } else {
- if (brokerId > 0) {
- changeToSlave(newMasterAddress, newMasterEpoch, brokerId);
- } else if (brokerId < 0) {
- // If the brokerId is no existed, we should try register again.
- registerBrokerToController();
+ if (newMasterEpoch > this.masterEpoch) {
+ if (StringUtils.isNoneEmpty(newMasterAddress)) {
+ if (StringUtils.equals(newMasterAddress, this.localAddress)) {
+ changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch());
+ } else {
+ if (brokerId > 0) {
+ changeToSlave(newMasterAddress, newMasterEpoch, brokerId);
+ } else if (brokerId < 0) {
+ // If the brokerId is no existed, we should try register again.
+ registerBrokerToController();
+ }
}
+ } else {
+ // In this case, the master in controller is null, try register to controller again, this will trigger the electMasterEvent in controller.
+ registerBrokerToController();
}
- } else {
+ } else if (newMasterEpoch == this.masterEpoch) {
// Check if sync state set changed
if (isMasterState()) {
changeSyncStateSet(syncStateSet.getSyncStateSet(), syncStateSet.getSyncStateSetEpoch());
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index e99036817..67f293ad1 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -69,13 +69,13 @@ public class DLedgerController implements Controller {
private final DLedgerServer dLedgerServer;
private final ControllerConfig controllerConfig;
private final DLedgerConfig dLedgerConfig;
- // Usr for checking whether the broker is alive
- private final BiPredicate<String, String> brokerAlivePredicate;
private final ReplicasInfoManager replicasInfoManager;
private final EventScheduler scheduler;
private final EventSerializer eventSerializer;
private final RoleChangeHandler roleHandler;
private final DLedgerControllerStateMachine statemachine;
+ // Usr for checking whether the broker is alive
+ private BiPredicate<String, String> brokerAlivePredicate;
private volatile boolean isScheduling = false;
public DLedgerController(final ControllerConfig config, final BiPredicate<String, String> brokerAlivePredicate) {
@@ -217,6 +217,10 @@ public class DLedgerController implements Controller {
return this.dLedgerServer.getMemberState();
}
+ public void setBrokerAlivePredicate(BiPredicate<String, String> brokerAlivePredicate) {
+ this.brokerAlivePredicate = brokerAlivePredicate;
+ }
+
/**
* Event handler that handle event
*/
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 2590bd205..ba350e5a3 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -157,6 +157,15 @@ public class ReplicasInfoManager {
final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
+ // First, check whether the master is still active
+ final String oldMaster = syncStateInfo.getMasterAddress();
+ if (StringUtils.isNoneEmpty(oldMaster) && brokerAlivePredicate.test(brokerInfo.getClusterName(), oldMaster)) {
+ String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerInfo.getBrokerName());
+ log.warn("{}", err);
+ result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+ return result;
+ }
+
// Try elect a master in syncStateSet
if (syncStateSet.size() > 1) {
boolean electSuccess = tryElectMaster(result, brokerName, syncStateSet, (candidate) ->
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 56730f40d..d4ef96024 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -164,10 +164,22 @@ public class DLedgerControllerTest {
return leader;
}
+ public void setBrokerAlivePredicate(DLedgerController controller, String... deathBroker) {
+ controller.setBrokerAlivePredicate((clusterName, brokerAddress) -> {
+ for (String broker : deathBroker) {
+ if (broker.equals(brokerAddress)) {
+ return false;
+ }
+ }
+ return true;
+ });
+ }
+
@Test
public void testElectMaster() throws Exception {
final DLedgerController leader = mockMetaData(false);
final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ setBrokerAlivePredicate(leader, "127.0.0.1:9000");
final RemotingCommand resp = leader.electMaster(request).get(10, TimeUnit.SECONDS);
final ElectMasterResponseHeader response = (ElectMasterResponseHeader) resp.readCustomHeader();
assertEquals(response.getMasterEpoch(), 2);
@@ -186,6 +198,7 @@ public class DLedgerControllerTest {
// Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
// However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+ setBrokerAlivePredicate(leader, "127.0.0.1:9000");
leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
final RemotingCommand resp = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).
@@ -225,6 +238,7 @@ public class DLedgerControllerTest {
// However, event if the syncStateSet in statemachine is {"127.0.0.1:9000"}
// the option {enableElectUncleanMaster = true}, so the controller sill can elect a new master
final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+ setBrokerAlivePredicate(leader, "127.0.0.1:9000");
final CompletableFuture<RemotingCommand> future = leader.electMaster(electRequest);
future.get(10, TimeUnit.SECONDS);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 8bf594076..4c47b5a77 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -108,7 +108,7 @@ public class ReplicasInfoManagerTest {
public void testElectMaster() {
mockMetaData();
final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
- final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, (va1, va2) -> true);
+ final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, (clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"));
final ElectMasterResponseHeader response = cResult.getResponse();
assertEquals(response.getMasterEpoch(), 2);
assertFalse(response.getNewMasterAddress().isEmpty());
@@ -125,7 +125,7 @@ public class ReplicasInfoManagerTest {
// Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
// However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
- final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest, (va1, va2) -> true);
+ final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest, (clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"));
final List<EventMessage> events = cResult.getEvents();
assertEquals(events.size(), 1);
final ElectMasterEvent event = (ElectMasterEvent) events.get(0);