You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/01 07:54:16 UTC
[rocketmq] branch dledger-controller-brokerId updated: [ISSUE#5045] Refactor the register and elect-master process in controller mode (#5046)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/dledger-controller-brokerId by this push:
new c5474b33a [ISSUE#5045] Refactor the register and elect-master process in controller mode (#5046)
c5474b33a is described below
commit c5474b33a5232183bf60c6ae42c6bc7efe6f14ca
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Wed Feb 1 15:54:06 2023 +0800
[ISSUE#5045] Refactor the register and elect-master process in controller mode (#5046)
* refactor(controller): refactor the register logic
1. refactor the register logic
* refactor(controller): remove unused field in ElectMasterEvent
1. remove unused field in ElectMasterEvent
* feat(controller): add a tryElectMaster request and process logic about it
1. add a tryElectMaster request and process logic about it
* feat(controller): refactor ReplicasInfoManagerTest
1. refactor ReplicasInfoManagerTest
* refactor(controller): refactor DLedgerControllerTest
1. refactor DLedgerControllerTest
* refactor(controller): refactor ControllerManagerTest
1. refactor ControllerManagerTest
* refactor(controller): refactor ReplicasInfoManagerTest
1. refactor ReplicasInfoManagerTest
* refactor(controller): refactor register process and pass the junit test
1. refactor ReplicasInfoManagerTest
* style(broker): rename a constant
1. rename a constant
* feat(controller): update the DLedger dependency from v0.27 to v0.30
1. update the DLedger dependency from v0.27 to v0.30
* style(controller): add a white-line just for trigger GitHub action again
1. add a white-line just for trigger GitHub action again
* feat(controller): combine electMaster api and brokerTryElectMaster api
1. combine electMaster api and brokerTryElectMaster api
* feat(controller): add a logic about verifying the broker id returned from registering
1. add a logic about verifying the broker id returned from registering
* fix(controller): remove unused code and add a warning log in ControllerManager
1. remove unused code and add a warning log in ControllerManager
* fix: resolve conflicts
1. resolve conflicts
* fix(controller): remove unused class
1. remove unused class
* fix(controller): Resolve conflicts after merging
1. Resolve conflicts after merging
* refactor(controller): Refactor ReplicasInfoManager#elect
1. Refactor ReplicasInfoManager#elect
* style(controller): remove unused imports
1. remove unused imports
* style(controller): remove unused imports
1. remove unused imports
* fix(controller): resolve conflicts after merging develop branch
1. resolve conflicts after merging develop branch
* rerun
* fix(controller): resolve conflicts in ReplicasInfoManagerTest#testRegisterNewBroker after merging develop branch
1. resolve conflicts in ReplicasInfoManagerTest#testRegisterNewBroker after merging develop branch
* style(controller): pass style check
1. pass style check
---
.../broker/controller/ReplicasManager.java | 50 ++++-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 32 ++-
.../broker/controller/ReplicasManagerTest.java | 6 +
.../rocketmq/client/impl/MQClientAPIImpl.java | 2 +-
.../rocketmq/controller/ControllerManager.java | 36 ++--
.../rocketmq/controller/elect/ElectPolicy.java | 4 +-
.../controller/impl/DLedgerController.java | 2 +-
.../impl/DLedgerControllerStateMachine.java | 6 +-
.../controller/impl/event/ApplyBrokerIdEvent.java | 11 +-
.../controller/impl/event/ElectMasterEvent.java | 15 +-
.../{BrokerInfo.java => BrokerReplicaInfo.java} | 14 +-
.../impl/manager/ReplicasInfoManager.java | 240 ++++++++++-----------
.../controller/impl/manager/SyncStateInfo.java | 19 +-
.../processor/ControllerRequestProcessor.java | 194 ++++++++++-------
.../impl/controller/ControllerManagerTest.java | 38 +++-
.../controller/impl/DLedgerControllerTest.java | 69 +++---
.../impl/manager/ReplicasInfoManagerTest.java | 186 ++++++++++++----
.../rocketmq/remoting/protocol/ResponseCode.java | 9 +-
.../protocol/body/RoleChangeNotifyEntry.java | 60 ++++++
.../controller/ElectMasterRequestHeader.java | 61 +++++-
.../controller/ElectMasterResponseHeader.java | 23 +-
.../controller/ReElectMasterSubCommand.java | 2 +-
22 files changed, 734 insertions(+), 345 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index a0218f8cc..02e24a859 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
@@ -103,6 +104,9 @@ public class ReplicasManager {
enum State {
INITIAL,
FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+
+ FIRST_TIME_WAIT_MASTER_IS_ELECTED,
+
RUNNING,
SHUTDOWN,
}
@@ -141,6 +145,15 @@ public class ReplicasManager {
if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
if (registerBrokerToController()) {
LOGGER.info("First time register broker success");
+ this.state = State.FIRST_TIME_WAIT_MASTER_IS_ELECTED;
+ } else {
+ return false;
+ }
+ }
+
+ if (this.state == State.FIRST_TIME_WAIT_MASTER_IS_ELECTED) {
+ if (StringUtils.isNotEmpty(this.masterAddress) || brokerElect()) {
+ LOGGER.info("Master in this broker set is elected");
this.state = State.RUNNING;
} else {
return false;
@@ -287,6 +300,30 @@ public class ReplicasManager {
}
}
+ private boolean brokerElect() {
+ // Broker try to elect itself as a master in broker set.
+ try {
+ ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
+ this.brokerConfig.getBrokerName(), this.localAddress);
+ final String masterAddress = tryElectResponse.getMasterAddress();
+ if (StringUtils.isEmpty(masterAddress)) {
+ LOGGER.warn("Now no master in broker set");
+ return false;
+ }
+
+ if (StringUtils.equals(masterAddress, this.localAddress)) {
+ changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
+ } else {
+ changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getBrokerId());
+ }
+ brokerController.setIsolated(false);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Failed to try elect", e);
+ return false;
+ }
+ }
+
private boolean registerBrokerToController() {
// Register this broker to controller, get brokerId and masterAddress.
try {
@@ -303,8 +340,13 @@ public class ReplicasManager {
// Set isolated to false, make broker can register to namesrv regularly
brokerController.setIsolated(false);
} else {
- LOGGER.warn("No master in controller");
- return false;
+ // if master address is empty, just apply the brokerId
+ if (registerResponse.getBrokerId() <= 0) {
+ // wrong broker id
+ LOGGER.error("Register to controller but receive a invalid broker id = {}", registerResponse.getBrokerId());
+ return false;
+ }
+ this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
}
return true;
} catch (final Exception e) {
@@ -340,8 +382,8 @@ public class ReplicasManager {
}
}
} else {
- // In this case, the master in controller is null, try register to controller again, this will trigger the electMasterEvent in controller.
- registerBrokerToController();
+ // In this case, the master in controller is null, try elect in controller, this will trigger the electMasterEvent in controller.
+ brokerElect();
}
} else if (newMasterEpoch == this.masterEpoch) {
// Check if sync state set changed
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 10cd27342..83f798474 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -116,6 +116,8 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResp
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -129,6 +131,9 @@ import org.apache.rocketmq.store.timer.TimerMetrics;
import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_MASTER_STILL_EXIST;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
public class BrokerOuterAPI {
@@ -203,7 +208,7 @@ public class BrokerOuterAPI {
public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
- List<String> lst = new ArrayList<>(Arrays.asList(addrArray));
+ List<String> lst = new ArrayList<String>(Arrays.asList(addrArray));
this.remotingClient.updateNameServerAddressList(lst);
}
@@ -1156,6 +1161,31 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+ /**
+ * Broker try to elect itself as a master in broker set
+ */
+ public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName,
+ String brokerAddress) throws Exception {
+
+ final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader);
+ RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+ assert response != null;
+ switch (response.getCode()) {
+ case CONTROLLER_NOT_LEADER: {
+ throw new MQBrokerException(response.getCode(), "Controller leader was changed");
+ }
+ case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ case CONTROLLER_ELECT_MASTER_FAILED:
+ case CONTROLLER_MASTER_STILL_EXIST:
+ case SUCCESS:
+ return (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
/**
* Register broker to controller
*/
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 01eacf43b..1414778d4 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
@@ -70,6 +71,8 @@ public class ReplicasManagerTest {
private RegisterBrokerToControllerResponseHeader registerBrokerToControllerResponseHeader;
+ private ElectMasterResponseHeader brokerTryElectResponseHeader;
+
private Pair<GetReplicaInfoResponseHeader, SyncStateSet> result;
private GetReplicaInfoResponseHeader getReplicaInfoResponseHeader;
@@ -108,6 +111,8 @@ public class ReplicasManagerTest {
getMetaDataResponseHeader = new GetMetaDataResponseHeader(GROUP, LEADER_ID, OLD_MASTER_ADDRESS, IS_LEADER, PEERS);
registerBrokerToControllerResponseHeader = new RegisterBrokerToControllerResponseHeader();
registerBrokerToControllerResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
+ brokerTryElectResponseHeader = new ElectMasterResponseHeader();
+ brokerTryElectResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
getReplicaInfoResponseHeader = new GetReplicaInfoResponseHeader();
getReplicaInfoResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
getReplicaInfoResponseHeader.setBrokerId(MASTER_BROKER_ID);
@@ -125,6 +130,7 @@ public class ReplicasManagerTest {
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
+ when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(brokerTryElectResponseHeader);
replicasManager = new ReplicasManager(brokerController);
autoSwitchHAService.init(defaultMessageStore);
replicasManager.start();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f251e2b00..2748acbd9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -3083,7 +3083,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
assert controllerMetaData != null;
assert controllerMetaData.getControllerLeaderAddress() != null;
final String leaderAddress = controllerMetaData.getControllerLeaderAddress();
- ElectMasterRequestHeader electRequestHeader = new ElectMasterRequestHeader(clusterName, brokerName, brokerAddr);
+ ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerAddr);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, electRequestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index a2c570817..662403192 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -24,12 +24,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
+
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
@@ -45,6 +47,7 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
@@ -87,7 +90,7 @@ public class ControllerManager {
new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
- return new FutureTaskExt<>(runnable, value);
+ return new FutureTaskExt<T>(runnable, value);
}
};
this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
@@ -127,13 +130,14 @@ public class ControllerManager {
log.warn("The {} broker with IP address {} shutdown", brokerName, brokerAddress);
return;
}
- final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(new ElectMasterRequestHeader(brokerName));
+
+ final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(brokerName));
final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS);
final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader();
if (responseHeader != null) {
log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
if (controllerConfig.isNotifyBrokerRoleChanged()) {
- notifyBrokerRoleChanged(responseHeader, clusterName);
+ notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader));
}
}
} catch (Exception e) {
@@ -147,20 +151,24 @@ public class ControllerManager {
/**
* Notify master and all slaves for a broker that the master role changed.
*/
- public void notifyBrokerRoleChanged(final ElectMasterResponseHeader electMasterResult, final String clusterName) {
- final BrokerMemberGroup memberGroup = electMasterResult.getBrokerMemberGroup();
+ public void notifyBrokerRoleChanged(final RoleChangeNotifyEntry entry) {
+ final BrokerMemberGroup memberGroup = entry.getBrokerMemberGroup();
if (memberGroup != null) {
+ final String master = entry.getMasterAddress();
+ if (StringUtils.isEmpty(master)) {
+ log.warn("Notify broker role change failed, because member group is not null but the new master address is empty, entry:{}", entry);
+ return;
+ }
// First, inform the master
- final String master = electMasterResult.getNewMasterAddress();
- if (StringUtils.isNoneEmpty(master) && this.heartbeatManager.isBrokerActive(clusterName, master)) {
- doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, electMasterResult);
+ if (this.heartbeatManager.isBrokerActive(memberGroup.getCluster(), master)) {
+ doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, entry);
}
// Then, inform all slaves
final Map<Long, String> brokerIdAddrs = memberGroup.getBrokerAddrs();
for (Map.Entry<Long, String> broker : brokerIdAddrs.entrySet()) {
- if (!broker.getValue().equals(master) && this.heartbeatManager.isBrokerActive(clusterName, broker.getValue())) {
- doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), electMasterResult);
+ if (!master.equals(broker.getValue()) && this.heartbeatManager.isBrokerActive(memberGroup.getCluster(), broker.getValue())) {
+ doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), entry);
}
}
@@ -168,11 +176,11 @@ public class ControllerManager {
}
public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId,
- final ElectMasterResponseHeader responseHeader) {
+ final RoleChangeNotifyEntry entry) {
if (StringUtils.isNoneEmpty(brokerAddr)) {
- log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
- final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
- responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
+ log.info("Try notify broker {} with id {} that role changed, RoleChangeNotifyEntry:{}", brokerAddr, brokerId, entry);
+ final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(),
+ entry.getMasterEpoch(), entry.getSyncStateSetEpoch(), brokerId);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
try {
this.remotingClient.invokeOneway(brokerAddr, request, 3000);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
index 214012e51..aba8f5538 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
@@ -28,9 +28,9 @@ public interface ElectPolicy {
* @param syncStateBrokers all broker replicas in syncStateSet
* @param allReplicaBrokers all broker replicas
* @param oldMaster old master
- * @param preferBrokerAddr the broker prefer to be elected
+ * @param brokerAddr broker address(can be used as prefer or assigned in some elect policy)
* @return new master's brokerAddr
*/
- String elect(String clusterName, Set<String> syncStateBrokers, Set<String> allReplicaBrokers, String oldMaster, String preferBrokerAddr);
+ String elect(String clusterName, Set<String> syncStateBrokers, Set<String> allReplicaBrokers, String oldMaster, String brokerAddr);
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index f9ea41174..f4f32e05c 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -107,7 +107,7 @@ public class DLedgerController implements Controller {
this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
this.replicasInfoManager = new ReplicasInfoManager(controllerConfig);
- this.statemachine = new DLedgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getSelfId());
+ this.statemachine = new DLedgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getGroup(), dLedgerConfig.getSelfId());
// Register statemachine and role handler.
this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
index dde94e998..39841d4e6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
@@ -39,10 +39,10 @@ public class DLedgerControllerStateMachine implements StateMachine {
private final String dLedgerId;
public DLedgerControllerStateMachine(final ReplicasInfoManager replicasInfoManager,
- final EventSerializer eventSerializer, final String dLedgerId) {
+ final EventSerializer eventSerializer, final String dLedgerGroupId, final String dLedgerSelfId) {
this.replicasInfoManager = replicasInfoManager;
this.eventSerializer = eventSerializer;
- this.dLedgerId = dLedgerId;
+ this.dLedgerId = generateDLedgerId(dLedgerGroupId, dLedgerSelfId);
}
@Override
@@ -76,6 +76,6 @@ public class DLedgerControllerStateMachine implements StateMachine {
@Override
public String getBindDLedgerId() {
- return dLedgerId;
+ return this.dLedgerId;
}
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
index 8cec2ec9a..c4934d7c0 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
@@ -21,11 +21,13 @@ package org.apache.rocketmq.controller.impl.event;
* Triggered by the RegisterBrokerApi.
*/
public class ApplyBrokerIdEvent implements EventMessage {
+ private final String clusterName;
private final String brokerName;
private final String brokerAddress;
private final long newBrokerId;
- public ApplyBrokerIdEvent(String brokerName, String brokerAddress, long newBrokerId) {
+ public ApplyBrokerIdEvent(String clusterName, String brokerName, String brokerAddress, long newBrokerId) {
+ this.clusterName = clusterName;
this.brokerName = brokerName;
this.brokerAddress = brokerAddress;
this.newBrokerId = newBrokerId;
@@ -48,10 +50,15 @@ public class ApplyBrokerIdEvent implements EventMessage {
return newBrokerId;
}
+ public String getClusterName() {
+ return clusterName;
+ }
+
@Override
public String toString() {
return "ApplyBrokerIdEvent{" +
- "brokerName='" + brokerName + '\'' +
+ "clusterName='" + clusterName + '\'' +
+ ", brokerName='" + brokerName + '\'' +
", brokerAddress='" + brokerAddress + '\'' +
", newBrokerId=" + newBrokerId +
'}';
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
index eb3b7984e..970b5d8cd 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
@@ -25,21 +25,19 @@ public class ElectMasterEvent implements EventMessage {
private final boolean newMasterElected;
private final String brokerName;
private final String newMasterAddress;
- private final String clusterName;
public ElectMasterEvent(boolean newMasterElected, String brokerName) {
- this(newMasterElected, brokerName, "", "");
+ this(newMasterElected, brokerName, "");
}
public ElectMasterEvent(String brokerName, String newMasterAddress) {
- this(true, brokerName, newMasterAddress, "");
+ this(true, brokerName, newMasterAddress);
}
- public ElectMasterEvent(boolean newMasterElected, String brokerName, String newMasterAddress, String clusterName) {
+ public ElectMasterEvent(boolean newMasterElected, String brokerName, String newMasterAddress) {
this.newMasterElected = newMasterElected;
this.brokerName = brokerName;
this.newMasterAddress = newMasterAddress;
- this.clusterName = clusterName;
}
@Override
@@ -59,17 +57,12 @@ public class ElectMasterEvent implements EventMessage {
return newMasterAddress;
}
- public String getClusterName() {
- return clusterName;
- }
-
@Override
public String toString() {
return "ElectMasterEvent{" +
- "isNewMasterElected=" + newMasterElected +
+ "newMasterElected=" + newMasterElected +
", brokerName='" + brokerName + '\'' +
", newMasterAddress='" + newMasterAddress + '\'' +
- ", clusterName='" + clusterName + '\'' +
'}';
}
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
similarity index 84%
rename from controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
index 0d56285fb..e2a68a544 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
@@ -20,19 +20,23 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.MixAll;
-public class BrokerInfo {
+/**
+ * Broker replicas info, mapping from brokerAddress to {brokerId, brokerHaAddress}.
+ */
+public class BrokerReplicaInfo {
private final String clusterName;
private final String brokerName;
// Start from 1
- private final AtomicLong brokerIdCount;
+ private final AtomicLong nextAssignBrokerId;
private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable;
- public BrokerInfo(String clusterName, String brokerName) {
+ public BrokerReplicaInfo(String clusterName, String brokerName) {
this.clusterName = clusterName;
this.brokerName = brokerName;
- this.brokerIdCount = new AtomicLong(1L);
this.brokerIdTable = new HashMap<>();
+ this.nextAssignBrokerId = new AtomicLong(MixAll.FIRST_SLAVE_ID);
}
public void removeBrokerAddress(final String address) {
@@ -40,7 +44,7 @@ public class BrokerInfo {
}
public long newBrokerId() {
- return this.brokerIdCount.incrementAndGet();
+ return this.nextAssignBrokerId.getAndIncrement();
}
public String getClusterName() {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index a820b069e..7b57bff9b 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -61,7 +61,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToC
public class ReplicasInfoManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
private final ControllerConfig controllerConfig;
- private final Map<String/* brokerName */, BrokerInfo> replicaInfoTable;
+ private final Map<String/* brokerName */, BrokerReplicaInfo> replicaInfoTable;
private final Map<String/* brokerName */, SyncStateInfo> syncStateSetInfoTable;
public ReplicasInfoManager(final ControllerConfig config) {
@@ -80,7 +80,7 @@ public class ReplicasInfoManager {
if (isContainsBroker(brokerName)) {
final Set<String> newSyncStateSet = syncStateSet.getSyncStateSet();
final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
- final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
// Check whether the oldSyncStateSet is equal with newSyncStateSet
final Set<String> oldSyncStateSet = syncStateInfo.getSyncStateSet();
@@ -120,13 +120,13 @@ public class ReplicasInfoManager {
// Check newSyncStateSet correctness
for (String replicas : newSyncStateSet) {
- if (!brokerInfo.isBrokerExist(replicas)) {
+ if (!brokerReplicaInfo.isBrokerExist(replicas)) {
String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replicas);
LOGGER.error("{}", err);
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REPLICAS, err);
return result;
}
- if (!brokerAlivePredicate.test(brokerInfo.getClusterName(), replicas)) {
+ if (!brokerAlivePredicate.test(brokerReplicaInfo.getClusterName(), replicas)) {
String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replicas);
LOGGER.error(err);
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NOT_ALIVE, err);
@@ -156,57 +156,85 @@ public class ReplicasInfoManager {
public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request,
final ElectPolicy electPolicy) {
final String brokerName = request.getBrokerName();
- final String assignBrokerAddress = request.getBrokerAddress();
+ final String brokerAddress = request.getBrokerAddress();
final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
- if (isContainsBroker(brokerName)) {
- final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
- final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
- final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
- final String oldMaster = syncStateInfo.getMasterAddress();
- Set<String> allReplicaBrokers = controllerConfig.isEnableElectUncleanMaster() ? brokerInfo.getAllBroker() : null;
-
- // elect by policy
- String newMaster = electPolicy.elect(brokerInfo.getClusterName(), syncStateSet, allReplicaBrokers, oldMaster, assignBrokerAddress);
- if (StringUtils.isNotEmpty(newMaster) && newMaster.equals(oldMaster)) {
- // old master still valid, change nothing
- String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerInfo.getBrokerName());
- LOGGER.warn("{}", err);
- result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, err);
- return result;
- }
- // a new master is elected
- if (StringUtils.isNotEmpty(newMaster)) {
- final int masterEpoch = this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
- final int syncStateSetEpoch = this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
- final ElectMasterResponseHeader response = result.getResponse();
- response.setNewMasterAddress(newMaster);
- response.setMasterEpoch(masterEpoch + 1);
- response.setSyncStateSetEpoch(syncStateSetEpoch);
- BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName);
- if (null != brokerMemberGroup) {
- response.setBrokerMemberGroup(brokerMemberGroup);
- result.setBody(brokerMemberGroup.encode());
- }
- final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster);
- result.addEvent(event);
- return result;
+ final ElectMasterResponseHeader response = result.getResponse();
+ if (!isContainsBroker(brokerName)) {
+ // this broker set hasn't been registered
+ response.setMasterAddress("");
+ result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, "Broker hasn't been registered");
+ return result;
+ }
+
+ final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
+ final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
+ final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
+ final String oldMaster = syncStateInfo.getMasterAddress();
+ Set<String> allReplicaBrokers = controllerConfig.isEnableElectUncleanMaster() ? brokerReplicaInfo.getAllBroker() : null;
+ String newMaster = null;
+
+ if (syncStateInfo.isFirstTimeForElect()) {
+ // If never have a master in this broker set, in other words, it is the first time to elect a master
+ // elect it as the first master
+ newMaster = brokerAddress;
+ }
+
+ // elect by policy
+ if (newMaster == null) {
+ // we should assign this assignedBrokerAddr when the brokerAddress need to be elected by force
+ String assignedBrokerAddr = request.isForceElect() ? brokerAddress : null;
+ newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerAddr);
+ }
+
+ if (StringUtils.isNotEmpty(newMaster) && newMaster.equals(oldMaster)) {
+ // old master still valid, change nothing
+ String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName());
+ LOGGER.warn("{}", err);
+ // the master still exist
+ response.setMasterEpoch(syncStateInfo.getMasterEpoch());
+ response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
+ response.setMasterAddress(syncStateInfo.getMasterAddress());
+ response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
+ result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err);
+ return result;
+ }
+
+ // a new master is elected
+ if (StringUtils.isNotEmpty(newMaster)) {
+ final int masterEpoch = syncStateInfo.getMasterEpoch();
+ final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch();
+ response.setMasterAddress(newMaster);
+ response.setMasterEpoch(masterEpoch + 1);
+ response.setSyncStateSetEpoch(syncStateSetEpoch + 1);
+ response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
+ BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName);
+ if (null != brokerMemberGroup) {
+ response.setBrokerMemberGroup(brokerMemberGroup);
+ result.setBody(brokerMemberGroup.encode());
}
- // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
- // that the master was shutdown and no new master was elected.
- final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+ final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster);
result.addEvent(event);
- result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Failed to elect a new broker master");
return result;
}
- result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Broker metadata is not existed");
+ // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress),
+ // we still need to apply an ElectMasterEvent to tell the statemachine
+ // that the master was shutdown and no new master was elected.
+ if (request.getBrokerAddress() == null) {
+ final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+ result.addEvent(event);
+ result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master");
+ } else {
+ result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master");
+ }
+ response.setMasterAddress("");
return result;
}
private BrokerMemberGroup buildBrokerMemberGroup(final String brokerName) {
if (isContainsBroker(brokerName)) {
- final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
- final BrokerMemberGroup group = new BrokerMemberGroup(brokerInfo.getClusterName(), brokerName);
- final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+ final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
+ final BrokerMemberGroup group = new BrokerMemberGroup(brokerReplicaInfo.getClusterName(), brokerName);
+ final HashMap<String, Long> brokerIdTable = brokerReplicaInfo.getBrokerIdTable();
final HashMap<Long, String> memberGroup = new HashMap<>();
brokerIdTable.forEach((addr, id) -> memberGroup.put(id, addr));
group.setBrokerAddrs(memberGroup);
@@ -222,80 +250,40 @@ public class ReplicasInfoManager {
final String clusterName = request.getClusterName();
final ControllerResult<RegisterBrokerToControllerResponseHeader> result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
final RegisterBrokerToControllerResponseHeader response = result.getResponse();
- boolean canBeElectedAsMaster;
-
+ // If the broker's metadata does not exist in the state machine, we can assign the broker a brokerId valued 1
+ // By default, we set this variable to a value of 1
+ long brokerId = MixAll.FIRST_SLAVE_ID;
+ boolean shouldApplyBrokerId = true;
if (isContainsBroker(brokerName)) {
final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
- final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
- // Get brokerId.
- long brokerId;
- if (!brokerInfo.isBrokerExist(brokerAddress)) {
- // If this broker replicas is first time come online, we need to apply a new id for this replicas.
- brokerId = brokerInfo.newBrokerId();
- final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(brokerName, brokerAddress, brokerId);
- result.addEvent(applyIdEvent);
+ if (brokerReplicaInfo.isBrokerExist(brokerAddress)) {
+ // this broker have registered
+ brokerId = brokerReplicaInfo.getBrokerId(brokerAddress);
+ shouldApplyBrokerId = false;
} else {
- brokerId = brokerInfo.getBrokerId(brokerAddress);
+ // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+ brokerId = brokerReplicaInfo.newBrokerId();
}
- response.setBrokerId(brokerId);
- response.setMasterEpoch(syncStateInfo.getMasterEpoch());
- response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
if (syncStateInfo.isMasterExist() && brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) {
// If the master is alive, just return master info.
final String masterAddress = syncStateInfo.getMasterAddress();
response.setMasterAddress(masterAddress);
- return result;
- } else if (syncStateInfo.isMasterExist() && !brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) {
- // filter alive slave broker
- Set<String> aliveSlaveBrokerAddressSet = syncStateInfo.getSyncStateSet().stream()
- .filter(brokerAddr -> brokerAlivePredicate.test(clusterName, brokerAddr) && !StringUtils.equals(brokerAddr, syncStateInfo.getMasterAddress()))
- .collect(Collectors.toSet());
- if (null != aliveSlaveBrokerAddressSet && aliveSlaveBrokerAddressSet.size() > 0) {
- if (!aliveSlaveBrokerAddressSet.contains(brokerAddress)) {
- brokerAddress = aliveSlaveBrokerAddressSet.iterator().next();
- }
- canBeElectedAsMaster = true;
- } else {
- // If the master is not alive and all slave is not alive, we should elect a new master:
- // Case1: This replicas was in sync state set list
- // Case2: The option {EnableElectUncleanMaster} is true
- canBeElectedAsMaster = syncStateInfo.getSyncStateSet().contains(brokerAddress) || this.controllerConfig.isEnableElectUncleanMaster();
- }
- if (!canBeElectedAsMaster) {
- // still need to apply an ElectMasterEvent to tell the statemachine
- // that the master was shutdown and no new master was elected. set SyncStateInfo.masterAddress empty
- final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
- result.addEvent(event);
- }
- } else {
- // If the master is not alive, we should elect a new master:
- // Case1: This replicas was in sync state set list
- // Case2: The option {EnableElectUncleanMaster} is true
- canBeElectedAsMaster = syncStateInfo.getSyncStateSet().contains(brokerAddress) || this.controllerConfig.isEnableElectUncleanMaster();
+ response.setMasterEpoch(syncStateInfo.getMasterEpoch());
+ response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
}
- } else {
- // If the broker's metadata does not exist in the state machine, the replicas can be elected as master directly.
- canBeElectedAsMaster = true;
}
- if (canBeElectedAsMaster) {
- final boolean isBrokerExist = isContainsBroker(brokerName);
- int masterEpoch = isBrokerExist ? this.syncStateSetInfoTable.get(brokerName).getMasterEpoch() + 1 : 1;
- int syncStateSetEpoch = isBrokerExist ? this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch() + 1 : 1;
- response.setMasterAddress(brokerAddress);
- response.setMasterEpoch(masterEpoch);
- response.setSyncStateSetEpoch(syncStateSetEpoch);
- response.setBrokerId(MixAll.MASTER_ID);
-
- final ElectMasterEvent event = new ElectMasterEvent(true, brokerName, brokerAddress, clusterName);
- result.addEvent(event);
- return result;
+ response.setBrokerId(brokerId);
+ if (response.getMasterAddress() == null) {
+ response.setMasterAddress("");
+ }
+ if (shouldApplyBrokerId) {
+ final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getClusterName(), brokerName, brokerAddress, brokerId);
+ result.addEvent(applyIdEvent);
}
-
- response.setMasterAddress("");
- result.setCodeAndRemark(ResponseCode.CONTROLLER_REGISTER_BROKER_FAILED, "The broker has not master, and this new registered broker can't not be elected as master");
return result;
}
@@ -306,12 +294,12 @@ public class ReplicasInfoManager {
if (isContainsBroker(brokerName)) {
// If exist broker metadata, just return metadata
final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
- final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
final String masterAddress = syncStateInfo.getMasterAddress();
response.setMasterAddress(masterAddress);
response.setMasterEpoch(syncStateInfo.getMasterEpoch());
if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
- response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress()));
+ response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
}
result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
return result;
@@ -327,12 +315,12 @@ public class ReplicasInfoManager {
if (isContainsBroker(brokerName)) {
// If exist broker metadata, just return metadata
final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
- final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
final String master = syncStateInfo.getMasterAddress();
final ArrayList<InSyncStateData.InSyncMember> inSyncMembers = new ArrayList<>();
syncStateSet.forEach(replicas -> {
- long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(replicas);
+ long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerReplicaInfo.getBrokerId(replicas);
inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerId));
});
@@ -417,10 +405,21 @@ public class ReplicasInfoManager {
private void handleApplyBrokerId(final ApplyBrokerIdEvent event) {
final String brokerName = event.getBrokerName();
if (isContainsBroker(brokerName)) {
- final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
- if (!brokerInfo.isBrokerExist(event.getBrokerAddress())) {
- brokerInfo.addBroker(event.getBrokerAddress(), event.getNewBrokerId());
+ final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
+ if (!brokerReplicaInfo.isBrokerExist(event.getBrokerAddress())) {
+ brokerReplicaInfo.addBroker(event.getBrokerAddress(), event.getNewBrokerId());
}
+ } else {
+ // First time to register in this broker set
+ // Initialize the replicaInfo about this broker set
+ final String clusterName = event.getClusterName();
+ final BrokerReplicaInfo brokerReplicaInfo = new BrokerReplicaInfo(clusterName, brokerName);
+ long brokerId = brokerReplicaInfo.newBrokerId();
+ brokerReplicaInfo.addBroker(event.getBrokerAddress(), brokerId);
+ this.replicaInfoTable.put(brokerName, brokerReplicaInfo);
+ final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName);
+ // Initialize an empty syncStateInfo for this broker set
+ this.syncStateSetInfoTable.put(brokerName, syncStateInfo);
}
}
@@ -443,16 +442,9 @@ public class ReplicasInfoManager {
// So we should delete old master, but retain newSyncStateSet list.
syncStateInfo.updateMasterInfo("");
}
- } else {
- // When the first replicas of a broker come online,
- // we can create memory meta information for the broker, and regard it as master
- final String clusterName = event.getClusterName();
- final BrokerInfo brokerInfo = new BrokerInfo(clusterName, brokerName);
- brokerInfo.addBroker(newMaster, 1L);
- final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName, newMaster);
- this.syncStateSetInfoTable.put(brokerName, syncStateInfo);
- this.replicaInfoTable.put(brokerName, brokerInfo);
+ return;
}
+ LOGGER.error("Receive an ElectMasterEvent which contains the un-registered broker, event = {}", event);
}
private void handleCleanBrokerDataEvent(final CleanBrokerDataEvent event) {
@@ -468,13 +460,13 @@ public class ReplicasInfoManager {
if (!isContainsBroker(brokerName)) {
return;
}
- final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
for (String brokerAddress : brokerAddressSet) {
- brokerInfo.removeBrokerAddress(brokerAddress);
+ brokerReplicaInfo.removeBrokerAddress(brokerAddress);
syncStateInfo.removeSyncState(brokerAddress);
}
- if (brokerInfo.getBrokerIdTable().isEmpty()) {
+ if (brokerReplicaInfo.getBrokerIdTable().isEmpty()) {
this.replicaInfoTable.remove(brokerName);
}
if (syncStateInfo.getSyncStateSet().isEmpty()) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 997dee3c5..29570b5ea 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -16,8 +16,10 @@
*/
package org.apache.rocketmq.controller.impl.manager;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
/**
* Manages the syncStateSet of broker replicas.
@@ -25,13 +27,21 @@ import java.util.Set;
public class SyncStateInfo {
private final String clusterName;
private final String brokerName;
-
private Set<String/*Address*/> syncStateSet;
private int syncStateSetEpoch;
private String masterAddress;
private int masterEpoch;
+ public SyncStateInfo(String clusterName, String brokerName) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.masterEpoch = 0;
+ this.syncStateSetEpoch = 0;
+ this.syncStateSet = Collections.emptySet();
+ }
+
+
public SyncStateInfo(String clusterName, String brokerName, String masterAddress) {
this.clusterName = clusterName;
this.brokerName = brokerName;
@@ -42,6 +52,7 @@ public class SyncStateInfo {
this.syncStateSetEpoch = 1;
}
+
public void updateMasterInfo(String masterAddress) {
this.masterAddress = masterAddress;
this.masterEpoch++;
@@ -52,8 +63,12 @@ public class SyncStateInfo {
this.syncStateSetEpoch++;
}
+ public boolean isFirstTimeForElect() {
+ return this.masterEpoch == 0;
+ }
+
public boolean isMasterExist() {
- return !this.masterAddress.isEmpty();
+ return StringUtils.isNotEmpty(this.masterAddress);
}
public String getClusterName() {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index cdc4abee0..78ebad8fc 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
@@ -33,6 +34,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
@@ -77,100 +79,125 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
request);
}
switch (request.getCode()) {
- case CONTROLLER_ALTER_SYNC_STATE_SET: {
- final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
- final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
- final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
- if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- break;
+ case CONTROLLER_ALTER_SYNC_STATE_SET:
+ return this.handleAlterSyncStateSet(ctx, request);
+ case CONTROLLER_ELECT_MASTER:
+ return this.handleControllerElectMaster(ctx, request);
+ case CONTROLLER_REGISTER_BROKER:
+ return this.handleControllerRegisterBroker(ctx, request);
+ case CONTROLLER_GET_REPLICA_INFO:
+ return this.handleControllerGetReplicaInfo(ctx, request);
+ case CONTROLLER_GET_METADATA_INFO:
+ return this.handleControllerGetMetadataInfo(ctx, request);
+ case BROKER_HEARTBEAT:
+ return this.handleBrokerHeartbeat(ctx, request);
+ case CONTROLLER_GET_SYNC_STATE_DATA:
+ return this.handleControllerGetSyncStateData(ctx, request);
+ case UPDATE_CONTROLLER_CONFIG:
+ return this.handleUpdateControllerConfig(ctx, request);
+ case GET_CONTROLLER_CONFIG:
+ return this.handleGetControllerConfig(ctx, request);
+ case CLEAN_BROKER_DATA:
+ return this.handleCleanBrokerData(ctx, request);
+ default: {
+ final String error = " request type " + request.getCode() + " not supported";
+ return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}
- case CONTROLLER_ELECT_MASTER: {
- final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
- final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
- if (future != null) {
- final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
-
- if (null != responseHeader) {
- if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
- this.controllerManager.notifyBrokerRoleChanged(responseHeader, electMasterRequest.getClusterName());
- }
- }
- return response;
+ }
+ }
+
+ private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+ RemotingCommand request) throws Exception {
+ final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+ final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+ final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleControllerElectMaster(ChannelHandlerContext ctx,
+ RemotingCommand request) throws Exception {
+ final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
+ if (future != null) {
+ final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
+
+ if (response.getCode() == ResponseCode.SUCCESS && responseHeader != null) {
+ if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+ this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader));
}
- break;
}
- case CONTROLLER_REGISTER_BROKER: {
- final RegisterBrokerToControllerRequestHeader controllerRequest = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
- final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(controllerRequest);
- if (future != null) {
- final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
- if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
- this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
- responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
- controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
- }
- return response;
- }
- break;
+ return response;
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+
+ private RemotingCommand handleControllerRegisterBroker(ChannelHandlerContext ctx,
+ RemotingCommand request) throws Exception {
+ final RegisterBrokerToControllerRequestHeader controllerRequest = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(controllerRequest);
+ if (future != null) {
+ final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
+ if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
+ this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
+ responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
+ controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
}
- case CONTROLLER_GET_REPLICA_INFO: {
- final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
- final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
+ return response;
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+ RemotingCommand request) throws Exception {
+ final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ private RemotingCommand handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand request) {
+ return this.controllerManager.getController().getControllerMetadata();
+ }
+
+ private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+ this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
+ requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+ return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
+ }
+
+ private RemotingCommand handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+ RemotingCommand request) throws Exception {
+ if (request.getBody() != null) {
+ final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
+ if (brokerNames != null && brokerNames.size() > 0) {
+ final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
- break;
- }
- case CONTROLLER_GET_METADATA_INFO: {
- return this.controllerManager.getController().getControllerMetadata();
- }
- case BROKER_HEARTBEAT: {
- final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
- this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
- requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
- return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
- }
- case CONTROLLER_GET_SYNC_STATE_DATA: {
- if (request.getBody() != null) {
- final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
- if (brokerNames != null && brokerNames.size() > 0) {
- final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
- if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- }
- }
- break;
- }
- case UPDATE_CONTROLLER_CONFIG:
- return this.updateControllerConfig(ctx, request);
- case GET_CONTROLLER_CONFIG:
- return this.getControllerConfig(ctx, request);
- case CLEAN_BROKER_DATA:
- final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
- final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
- if (null != future) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- }
- break;
- default: {
- final String error = " request type " + request.getCode() + " not supported";
- return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}
}
return RemotingCommand.createResponseCommand(null);
}
- @Override
- public boolean rejectRequest() {
- return false;
+ private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
+ if (null != future) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ return RemotingCommand.createResponseCommand(null);
}
- private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+ private RemotingCommand handleUpdateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
if (ctx != null) {
log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
@@ -205,7 +232,7 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand getControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+ private RemotingCommand handleGetControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
String content = this.controllerManager.getConfiguration().getAllConfigsFormatString();
@@ -225,4 +252,9 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
return response;
}
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 6cef5978c..869a50e72 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -36,19 +36,24 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_MASTER_STILL_EXIST;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ControllerManagerTest {
@@ -131,7 +136,7 @@ public class ControllerManagerTest {
final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, heartbeatTimeoutMillis, 1, 1000L, 0);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = client.invokeSync(controllerAddress, request, 3000);
- assert response != null;
+ assertNotNull(response);
switch (response.getCode()) {
case SUCCESS: {
return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
@@ -143,20 +148,45 @@ public class ControllerManagerTest {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+ public RemotingCommand brokerTryElect(final String controllerAddress, final String clusterName,
+ final String brokerName, final String brokerAddress, final RemotingClient client) throws Exception {
+ final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress);
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader);
+ RemotingCommand response = client.invokeSync(controllerAddress, request, 3000);
+ assertNotNull(response);
+ return response;
+ }
+
@Test
public void testSomeApi() throws Exception {
mockData();
final ControllerManager leader = waitLeader(this.controllers);
String leaderAddr = "localhost" + ":" + leader.getController().getRemotingServer().localListenPort();
- // Register two broker, the first one is master.
+ // Register two broker
final RegisterBrokerToControllerResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", this.remotingClient, 1000L);
assert responseHeader1 != null;
- assertEquals(responseHeader1.getBrokerId(), MixAll.MASTER_ID);
+ assertEquals(responseHeader1.getBrokerId(), MixAll.FIRST_SLAVE_ID);
final RegisterBrokerToControllerResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1, 4000L);
assert responseHeader2 != null;
- assertEquals(responseHeader2.getBrokerId(), 2);
+ assertEquals(responseHeader2.getBrokerId(), MixAll.FIRST_SLAVE_ID + 1);
+
+ // Two all try elect itself as master, but only the first can be the master
+ RemotingCommand tryElectCommand1 = brokerTryElect(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", this.remotingClient);
+ ElectMasterResponseHeader brokerTryElectResponseHeader1 = (ElectMasterResponseHeader) tryElectCommand1.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+ RemotingCommand tryElectCommand2 = brokerTryElect(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1);
+ ElectMasterResponseHeader brokerTryElectResponseHeader2 = (ElectMasterResponseHeader) tryElectCommand2.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+
+ assertEquals(SUCCESS, tryElectCommand1.getCode());
+ assertEquals("127.0.0.1:8000", brokerTryElectResponseHeader1.getMasterAddress());
+ assertEquals(1L, brokerTryElectResponseHeader1.getMasterEpoch());
+ assertEquals(1L, brokerTryElectResponseHeader1.getSyncStateSetEpoch());
+
+ assertEquals(CONTROLLER_MASTER_STILL_EXIST, tryElectCommand2.getCode());
+ assertEquals("127.0.0.1:8000", brokerTryElectResponseHeader2.getMasterAddress());
+ assertEquals(1L, brokerTryElectResponseHeader2.getMasterEpoch());
+ assertEquals(1L, brokerTryElectResponseHeader2.getSyncStateSetEpoch());
// Send heartbeat for broker2
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 84bf7c72b..873e76760 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -94,8 +95,8 @@ public class DLedgerControllerTest {
}
}
- public boolean registerNewBroker(Controller leader, String clusterName, String brokerName, String brokerAddress,
- boolean isFirstRegisteredBroker) throws Exception {
+ public void registerNewBroker(Controller leader, String clusterName, String brokerName, String brokerAddress,
+ long expectBrokerId) throws Exception {
// Register new broker
final RegisterBrokerToControllerRequestHeader registerRequest = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress);
RemotingCommand response = await().atMost(Duration.ofSeconds(20)).until(() -> {
@@ -109,14 +110,22 @@ public class DLedgerControllerTest {
e.printStackTrace();
return null;
}
- }, item -> item != null);
+ }, Objects::nonNull);
final RegisterBrokerToControllerResponseHeader registerResult = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
- if (!isFirstRegisteredBroker) {
- assertTrue(registerResult.getBrokerId() > 0);
- }
- return true;
+ assertEquals(expectBrokerId, registerResult.getBrokerId());
+ }
+
+ public void brokerTryElectMaster(Controller leader, String clusterName, String brokerName, String brokerAddress,
+ boolean exceptSuccess) {
+ final ElectMasterRequestHeader electMasterRequestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress);
+ RemotingCommand command = await().atMost(Duration.ofSeconds(20)).until(() -> {
+ return leader.electMaster(electMasterRequestHeader).get(2, TimeUnit.SECONDS);
+ }, Objects::nonNull);
+
+ ElectMasterResponseHeader header = (ElectMasterResponseHeader) command.readCustomHeader();
+ assertEquals(exceptSuccess, ResponseCode.SUCCESS == command.getCode());
}
private boolean alterNewInSyncSet(Controller leader, String brokerName, String masterAddress, int masterEpoch,
@@ -167,14 +176,18 @@ public class DLedgerControllerTest {
DLedgerController leader = waitLeader(controllers);
- assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9000", true));
- assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9001", true));
- assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9002", true));
+ // register
+ registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9000", 1L);
+ registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9001", 2L);
+ registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9002", 3L);
+ // try elect
+ brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9000", true);
+ brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9001", false);
+ brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9002", false);
final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS);
final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader();
- assertEquals(replicaInfo.getMasterEpoch(), 1);
- assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
-
+ assertEquals(1, replicaInfo.getMasterEpoch());
+ assertEquals("127.0.0.1:9000", replicaInfo.getMasterAddress());
// Try alter sync state set
final HashSet<String> newSyncStateSet = new HashSet<>();
newSyncStateSet.add("127.0.0.1:9000");
@@ -209,13 +222,13 @@ public class DLedgerControllerTest {
@Test
public void testElectMaster() throws Exception {
final DLedgerController leader = mockMetaData(false);
- final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
setBrokerElectPolicy(leader, "127.0.0.1:9000");
final RemotingCommand resp = leader.electMaster(request).get(10, TimeUnit.SECONDS);
final ElectMasterResponseHeader response = (ElectMasterResponseHeader) resp.readCustomHeader();
assertEquals(response.getMasterEpoch(), 2);
- assertFalse(response.getNewMasterAddress().isEmpty());
- assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+ assertFalse(response.getMasterAddress().isEmpty());
+ assertNotEquals(response.getMasterAddress(), "127.0.0.1:9000");
}
@Test
@@ -228,7 +241,7 @@ public class DLedgerControllerTest {
// Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
// However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
- final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+ final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1");
setBrokerElectPolicy(leader, "127.0.0.1:9000");
leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
@@ -240,19 +253,17 @@ public class DLedgerControllerTest {
assertEquals(replicaInfo.getMasterAddress(), "");
assertEquals(replicaInfo.getMasterEpoch(), 2);
- // Now, we start broker1 - 127.0.0.1:9001, but it was not in syncStateSet, so it will not be elected as master.
- final RegisterBrokerToControllerRequestHeader request1 =
- new RegisterBrokerToControllerRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
- final RegisterBrokerToControllerResponseHeader r1 = (RegisterBrokerToControllerResponseHeader) leader.registerBroker(request1).get(10, TimeUnit.SECONDS).readCustomHeader();
- assertEquals(r1.getBrokerId(), 2);
+ // Now, we start broker1 - 127.0.0.1:9001 to try elect, but it was not in syncStateSet, so it will not be elected as master.
+ final ElectMasterRequestHeader request1 =
+ ElectMasterRequestHeader.ofBrokerTrigger("cluster1", "broker1", "127.0.0.1:9001");
+ final ElectMasterResponseHeader r1 = (ElectMasterResponseHeader) leader.electMaster(request1).get(10, TimeUnit.SECONDS).readCustomHeader();
assertEquals(r1.getMasterAddress(), "");
- assertEquals(r1.getMasterEpoch(), 2);
- // Now, we start broker1 - 127.0.0.1:9000, it will be elected as master
- final RegisterBrokerToControllerRequestHeader request2 =
- new RegisterBrokerToControllerRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
- final RegisterBrokerToControllerResponseHeader r2 = (RegisterBrokerToControllerResponseHeader) leader.registerBroker(request2).get(10, TimeUnit.SECONDS).readCustomHeader();
- assertEquals(r2.getBrokerId(), 0);
+ // Now, we start broker1 - 127.0.0.1:9000 to try elect, it will be elected as master
+ setBrokerElectPolicy(leader);
+ final ElectMasterRequestHeader request2 =
+ ElectMasterRequestHeader.ofBrokerTrigger("cluster1", "broker1", "127.0.0.1:9000");
+ final ElectMasterResponseHeader r2 = (ElectMasterResponseHeader) leader.electMaster(request2).get(10, TimeUnit.SECONDS).readCustomHeader();
assertEquals(r2.getMasterAddress(), "127.0.0.1:9000");
assertEquals(r2.getMasterEpoch(), 3);
}
@@ -268,7 +279,7 @@ public class DLedgerControllerTest {
// Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
// However, event if the syncStateSet in statemachine is {"127.0.0.1:9000"}
// the option {enableElectUncleanMaster = true}, so the controller sill can elect a new master
- final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+ final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1");
setBrokerElectPolicy(leader, "127.0.0.1:9000");
final CompletableFuture<RemotingCommand> future = leader.electMaster(electRequest);
future.get(10, TimeUnit.SECONDS);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index d3b03dfe9..57d372349 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -16,11 +16,13 @@
*/
package org.apache.rocketmq.controller.impl.controller.impl.manager;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
@@ -30,6 +32,7 @@ import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
@@ -55,11 +58,16 @@ public class ReplicasInfoManagerTest {
private DefaultBrokerHeartbeatManager heartbeatManager;
+ private ControllerConfig config;
+
+ private ElectPolicy electPolicy;
+
@Before
public void init() {
- final ControllerConfig config = new ControllerConfig();
- config.setEnableElectUncleanMaster(false);
- config.setScanNotActiveBrokerInterval(300000000);
+ this.electPolicy = new DefaultElectPolicy((clusterName, brokerAddr) -> true, null);
+ this.config = new ControllerConfig();
+ this.config.setEnableElectUncleanMaster(false);
+ this.config.setScanNotActiveBrokerInterval(300000000);
this.replicasInfoManager = new ReplicasInfoManager(config);
this.heartbeatManager = new DefaultBrokerHeartbeatManager(config);
this.heartbeatManager.start();
@@ -72,24 +80,67 @@ public class ReplicasInfoManagerTest {
this.heartbeatManager = null;
}
- public boolean registerNewBroker(String clusterName, String brokerName, String brokerAddress,
- boolean isFirstRegisteredBroker) {
+ public void registerNewBroker(String clusterName, String brokerName, String brokerAddress,
+ long exceptBrokerId, String exceptMasterAddress) {
// Register new broker
final RegisterBrokerToControllerRequestHeader registerRequest =
new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress);
final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult = this.replicasInfoManager.registerBroker(registerRequest, (s, v) -> true);
apply(registerResult.getEvents());
+ // check response
+ assertEquals(ResponseCode.SUCCESS, registerResult.getResponseCode());
+ assertEquals(exceptBrokerId, registerResult.getResponse().getBrokerId());
+ assertEquals(exceptMasterAddress, registerResult.getResponse().getMasterAddress());
+ // check it in state machine
+ final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
+ assertEquals(exceptBrokerId, replicaInfo.getBrokerId());
+ }
+
+ public void brokerElectMaster(String clusterName, long brokerId, String brokerName, String brokerAddress,
+ boolean isFirstTryElect) {
- if (isFirstRegisteredBroker) {
- final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
- final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse();
- assertEquals(replicaInfo.getMasterAddress(), brokerAddress);
- assertEquals(replicaInfo.getMasterEpoch(), 1);
+ final GetReplicaInfoResponseHeader replicaInfoBefore = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
+ byte[] body = this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName)).getBody();
+ InSyncStateData syncStateDataBefore = RemotingSerializable.decode(body, InSyncStateData.class);
+ // Try elect itself as a master
+ ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress);
+ final ControllerResult<ElectMasterResponseHeader> result = this.replicasInfoManager.electMaster(requestHeader, this.electPolicy);
+ apply(result.getEvents());
+
+ final GetReplicaInfoResponseHeader replicaInfoAfter = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
+ final ElectMasterResponseHeader response = result.getResponse();
+
+ if (isFirstTryElect) {
+ // it should be elected
+ // check response
+ assertEquals(ResponseCode.SUCCESS, result.getResponseCode());
+ assertEquals(1, response.getMasterEpoch());
+ assertEquals(1, response.getSyncStateSetEpoch());
+ assertEquals(brokerAddress, response.getMasterAddress());
+ // check it in state machine
+ assertEquals(brokerAddress, replicaInfoAfter.getMasterAddress());
+ assertEquals(1, replicaInfoAfter.getMasterEpoch());
+ assertEquals(brokerId, replicaInfoAfter.getBrokerId());
} else {
- final RegisterBrokerToControllerResponseHeader response = registerResult.getResponse();
- assertTrue(response.getBrokerId() > 0);
+
+ // failed because now master still exist
+ if (StringUtils.isNotEmpty(replicaInfoBefore.getMasterAddress())) {
+ assertEquals(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, result.getResponseCode());
+ assertEquals(replicaInfoBefore.getMasterAddress(), response.getMasterAddress());
+ assertEquals(replicaInfoBefore.getMasterEpoch(), response.getMasterEpoch());
+ assertEquals(brokerId, replicaInfoAfter.getBrokerId());
+ return;
+ }
+ if (syncStateDataBefore.getInSyncStateTable().containsKey(brokerAddress) || this.config.isEnableElectUncleanMaster()) {
+ // can be elected successfully
+ assertEquals(ResponseCode.SUCCESS, result.getResponseCode());
+ assertEquals(MixAll.MASTER_ID, replicaInfoAfter.getBrokerId());
+ assertEquals(brokerId, replicaInfoAfter.getBrokerId());
+ } else {
+ // failed because elect nothing
+ assertEquals(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, result.getResponseCode());
+ }
}
- return true;
}
@Test
@@ -102,6 +153,13 @@ public class ReplicasInfoManagerTest {
new RegisterBrokerToControllerRequestHeader("default", "brokerName-a", "127.0.0.1:9001");
final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult0 = this.replicasInfoManager.registerBroker(registerRequest0, (s, v) -> true);
apply(registerResult0.getEvents());
+ final ElectMasterRequestHeader electMasterRequest = ElectMasterRequestHeader.ofBrokerTrigger("default", "brokerName-a", "127.0.0.1:9000");
+ ControllerResult<ElectMasterResponseHeader> electMasterResponseHeaderControllerResult = this.replicasInfoManager.electMaster(electMasterRequest, new DefaultElectPolicy());
+ apply(electMasterResponseHeaderControllerResult.getEvents());
+ final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("brokerName-a"));
+ final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse();
+ assertEquals("127.0.0.1:9000", replicaInfo.getMasterAddress());
+ assertEquals(1, replicaInfo.getMasterEpoch());
final HashSet<String> newSyncStateSet = new HashSet<>();
newSyncStateSet.add("127.0.0.1:9000");
newSyncStateSet.add("127.0.0.1:9001");
@@ -110,10 +168,17 @@ public class ReplicasInfoManagerTest {
new RegisterBrokerToControllerRequestHeader("default", "brokerName-a", "127.0.0.1:9002");
final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult1 = this.replicasInfoManager.registerBroker(registerRequest1, (s, v) -> StringUtils.equals(v, "127.0.0.1:9001"));
apply(registerResult1.getEvents());
- final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("brokerName-a"));
- final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse();
- assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9001");
- assertEquals(replicaInfo.getMasterEpoch(), 2);
+ assertEquals(3, registerResult1.getResponse().getBrokerId());
+ assertEquals("", registerResult1.getResponse().getMasterAddress());
+ ElectPolicy electPolicy1 = new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"),null);
+ final ElectMasterRequestHeader electMasterRequest1 = ElectMasterRequestHeader.ofBrokerTrigger("default", "brokerName-a", "127.0.0.1:9002");
+ ControllerResult<ElectMasterResponseHeader> electMasterResponseHeaderControllerResult1 = this.replicasInfoManager.electMaster(electMasterRequest1, electPolicy1);
+ apply(electMasterResponseHeaderControllerResult1.getEvents());
+ final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult0 = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("brokerName-a"));
+ final GetReplicaInfoResponseHeader replicaInfo0 = getInfoResult0.getResponse();
+ assertEquals(replicaInfo0.getMasterAddress(), "127.0.0.1:9001");
+ assertTrue(replicaInfo0.getMasterAddress().equals("127.0.0.1:9001") || replicaInfo0.getMasterAddress().equals("127.0.0.1:9002"));
+ assertEquals(replicaInfo0.getMasterEpoch(), 2);
}
private boolean alterNewInSyncSet(String brokerName, String masterAddress, int masterEpoch,
@@ -139,9 +204,12 @@ public class ReplicasInfoManagerTest {
}
public void mockMetaData() {
- registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", true);
- registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", false);
- registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", false);
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "");
+ brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
+ brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
+ brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
final HashSet<String> newSyncStateSet = new HashSet<>();
newSyncStateSet.add("127.0.0.1:9000");
newSyncStateSet.add("127.0.0.1:9001");
@@ -185,43 +253,63 @@ public class ReplicasInfoManagerTest {
1, 3L, -1L, 1);
}
+ @Test
+ public void testRegisterBrokerSuccess() {
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "");
+ brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
+ brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
+ brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
+ }
+
+ @Test
+ public void testRegisterWithMasterExistResp() {
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
+ brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
+ brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "127.0.0.1:9000");
+ brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
+ }
+
@Test
public void testElectMasterOldMasterStillAlive() {
mockMetaData();
- final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
mockHeartbeatDataMasterStillAlive();
final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
electPolicy);
- assertEquals(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, cResult.getResponseCode());
+ assertEquals(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, cResult.getResponseCode());
}
@Test
public void testElectMasterPreferHigherEpoch() {
mockMetaData();
- final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
mockHeartbeatDataHigherEpoch();
final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
electPolicy);
final ElectMasterResponseHeader response = cResult.getResponse();
assertEquals(response.getMasterEpoch(), 2);
- assertFalse(response.getNewMasterAddress().isEmpty());
- assertEquals("127.0.0.1:9001", response.getNewMasterAddress());
+ assertFalse(response.getMasterAddress().isEmpty());
+ assertEquals("127.0.0.1:9001", response.getMasterAddress());
}
@Test
public void testElectMasterPreferHigherOffsetWhenEpochEquals() {
mockMetaData();
- final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
mockHeartbeatDataHigherOffset();
final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
electPolicy);
final ElectMasterResponseHeader response = cResult.getResponse();
assertEquals(response.getMasterEpoch(), 2);
- assertFalse(response.getNewMasterAddress().isEmpty());
- assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+ assertFalse(response.getMasterAddress().isEmpty());
+ assertEquals("127.0.0.1:9002", response.getMasterAddress());
}
@Test
@@ -234,45 +322,51 @@ public class ReplicasInfoManagerTest {
electPolicy);
final ElectMasterResponseHeader response = cResult.getResponse();
assertEquals(response.getMasterEpoch(), 2);
- assertFalse(response.getNewMasterAddress().isEmpty());
- assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+ assertFalse(response.getMasterAddress().isEmpty());
+ assertEquals("127.0.0.1:9002", response.getMasterAddress());
}
@Test
public void testElectMaster() {
mockMetaData();
- final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null));
final ElectMasterResponseHeader response = cResult.getResponse();
assertEquals(response.getMasterEpoch(), 2);
- assertFalse(response.getNewMasterAddress().isEmpty());
- assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+ assertFalse(response.getMasterAddress().isEmpty());
+ assertNotEquals(response.getMasterAddress(), "127.0.0.1:9000");
+
+ apply(cResult.getEvents());
final Set<String> brokerSet = new HashSet<>();
brokerSet.add("127.0.0.1:9000");
brokerSet.add("127.0.0.1:9001");
brokerSet.add("127.0.0.1:9002");
- final ElectMasterRequestHeader assignRequest = new ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+ assertTrue(alterNewInSyncSet("broker1", response.getMasterAddress(), response.getMasterEpoch(), brokerSet, response.getSyncStateSetEpoch()));
+
+ // test admin try to elect a assignedMaster, but it isn't alive
+ final ElectMasterRequestHeader assignRequest = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", "127.0.0.1:9000");
final ControllerResult<ElectMasterResponseHeader> cResult1 = this.replicasInfoManager.electMaster(assignRequest,
- new DefaultElectPolicy((clusterName, brokerAddress) -> brokerAddress.contains("127.0.0.1:9000"), null));
+ new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null));
+
assertEquals(cResult1.getResponseCode(), ResponseCode.CONTROLLER_ELECT_MASTER_FAILED);
- final ElectMasterRequestHeader assignRequest1 = new ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+ // test admin try to elect a assignedMaster but old master still alive, and the old master is equals to assignedMaster
+ final ElectMasterRequestHeader assignRequest1 = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", response.getMasterAddress());
final ControllerResult<ElectMasterResponseHeader> cResult2 = this.replicasInfoManager.electMaster(assignRequest1,
- new DefaultElectPolicy((clusterName, brokerAddress) -> brokerAddress.equals("127.0.0.1:9000"), null));
- assertEquals(cResult2.getResponseCode(), ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
+ new DefaultElectPolicy((clusterName, brokerAddress) -> true, null));
+ assertEquals(cResult2.getResponseCode(), ResponseCode.CONTROLLER_MASTER_STILL_EXIST);
- final ElectMasterRequestHeader assignRequest2 = new ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+ // admin successful elect a assignedMaster.
+ final ElectMasterRequestHeader assignRequest2 = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", "127.0.0.1:9000");
final ControllerResult<ElectMasterResponseHeader> cResult3 = this.replicasInfoManager.electMaster(assignRequest2,
- new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null));
+ new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals(response.getMasterAddress()), null));
assertEquals(cResult3.getResponseCode(), ResponseCode.SUCCESS);
- final ElectMasterResponseHeader response3 = cResult3.getResponse();
- assertEquals(response3.getNewMasterAddress(), "127.0.0.1:9001");
- assertEquals(response.getMasterEpoch(), 2);
- assertFalse(response.getNewMasterAddress().isEmpty());
- assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+ final ElectMasterResponseHeader response3 = cResult3.getResponse();
+ assertEquals(response3.getMasterAddress(), "127.0.0.1:9000");
+ assertEquals(response3.getMasterEpoch(), 3);
}
@Test
@@ -284,7 +378,7 @@ public class ReplicasInfoManagerTest {
// Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
// However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
- final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+ final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1");
final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest,
new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null));
final List<EventMessage> events = cResult.getEvents();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
index e442217fa..8e4e3770e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
@@ -113,10 +113,11 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int CONTROLLER_INVALID_CLEAN_BROKER_METADATA = 2009;
- public static final int CONTROLLER_ALTER_SYNC_STATE_SET_FAILED = 2010;
+ public static final int CONTROLLER_BROKER_NEED_TO_BE_REGISTERED = 2010;
- public static final int CONTROLLER_ELECT_MASTER_FAILED = 2011;
-
- public static final int CONTROLLER_REGISTER_BROKER_FAILED = 2012;
+ public static final int CONTROLLER_MASTER_STILL_EXIST = 2011;
+ public static final int CONTROLLER_ELECT_MASTER_FAILED = 2012;
+
+ public static final int CONTROLLER_ALTER_SYNC_STATE_SET_FAILED = 2013;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
new file mode 100644
index 000000000..2016b2968
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+
+public class RoleChangeNotifyEntry {
+
+ private final BrokerMemberGroup brokerMemberGroup;
+
+ private final String masterAddress;
+
+ private final int masterEpoch;
+
+ private final int syncStateSetEpoch;
+
+ public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterAddress, int masterEpoch, int syncStateSetEpoch) {
+ this.brokerMemberGroup = brokerMemberGroup;
+ this.masterAddress = masterAddress;
+ this.masterEpoch = masterEpoch;
+ this.syncStateSetEpoch = syncStateSetEpoch;
+ }
+
+ public static RoleChangeNotifyEntry convert(ElectMasterResponseHeader header) {
+ return new RoleChangeNotifyEntry(header.getBrokerMemberGroup(), header.getMasterAddress(), header.getMasterEpoch(), header.getSyncStateSetEpoch());
+ }
+
+
+ public BrokerMemberGroup getBrokerMemberGroup() {
+ return brokerMemberGroup;
+ }
+
+ public String getMasterAddress() {
+ return masterAddress;
+ }
+
+ public int getMasterEpoch() {
+ return masterEpoch;
+ }
+
+ public int getSyncStateSetEpoch() {
+ return syncStateSetEpoch;
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
index 134252007..bb9cfca3e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
@@ -28,9 +28,19 @@ public class ElectMasterRequestHeader implements CommandCustomHeader {
@CFNotNull
private String brokerName;
+ /**
+ * brokerAddress
+ * for brokerTrigger electMaster: this brokerAddress will be elected as a master when it is the first time to elect
+ * in this broker-set
+ * for adminTrigger electMaster: this brokerAddress is also named assignedBrokerAddress, which means we must prefer to elect
+ * it as a new master when this broker is valid.
+ */
@CFNotNull
private String brokerAddress;
+ @CFNotNull
+ private Boolean forceElect = false;
+
public ElectMasterRequestHeader() {
}
@@ -44,6 +54,26 @@ public class ElectMasterRequestHeader implements CommandCustomHeader {
this.brokerAddress = brokerAddress;
}
+ public ElectMasterRequestHeader(String clusterName, String brokerName, String brokerAddress, boolean forceElect) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerAddress = brokerAddress;
+ this.forceElect = forceElect;
+ }
+
+ public static ElectMasterRequestHeader ofBrokerTrigger(String clusterName, String brokerName,
+ String brokerAddress) {
+ return new ElectMasterRequestHeader(clusterName, brokerName, brokerAddress);
+ }
+
+ public static ElectMasterRequestHeader ofControllerTrigger(String brokerName) {
+ return new ElectMasterRequestHeader(brokerName);
+ }
+
+ public static ElectMasterRequestHeader ofAdminTrigger(String clusterName, String brokerName, String brokerAddress) {
+ return new ElectMasterRequestHeader(clusterName, brokerName, brokerAddress, true);
+ }
+
public String getBrokerName() {
return brokerName;
}
@@ -68,16 +98,39 @@ public class ElectMasterRequestHeader implements CommandCustomHeader {
this.clusterName = clusterName;
}
+ public boolean isForceElect() {
+ return this.forceElect;
+ }
+
@Override
public String toString() {
return "ElectMasterRequestHeader{" +
- "clusterName='" + clusterName + '\'' +
- ", brokerName='" + brokerName + '\'' +
- ", brokerAddress='" + brokerAddress + '\'' +
- '}';
+ "clusterName='" + clusterName + '\'' +
+ ", brokerName='" + brokerName + '\'' +
+ ", brokerAddress='" + brokerAddress + '\'' +
+ ", forceElect=" + forceElect +
+ '}';
}
@Override
public void checkFields() throws RemotingCommandException {
}
+
+ /**
+ * The elect master request's type
+ */
+ public enum ElectMasterTriggerType {
+ /**
+ * Trigger by broker
+ */
+ BROKER_TRIGGER,
+ /**
+ * Trigger by controller
+ */
+ CONTROLLER_TRIGGER,
+ /**
+ * Trigger by admin
+ */
+ ADMIN_TRIGGER
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
index 04b602e81..811d64150 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
@@ -21,20 +21,22 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
public class ElectMasterResponseHeader implements CommandCustomHeader {
- private String newMasterAddress;
+ private String masterAddress;
private int masterEpoch;
private int syncStateSetEpoch;
private BrokerMemberGroup brokerMemberGroup;
+ private long brokerId = -1;
+
public ElectMasterResponseHeader() {
}
- public String getNewMasterAddress() {
- return newMasterAddress;
+ public String getMasterAddress() {
+ return masterAddress;
}
- public void setNewMasterAddress(String newMasterAddress) {
- this.newMasterAddress = newMasterAddress;
+ public void setMasterAddress(String masterAddress) {
+ this.masterAddress = masterAddress;
}
public int getMasterEpoch() {
@@ -61,13 +63,22 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
this.brokerMemberGroup = brokerMemberGroup;
}
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
@Override
public String toString() {
return "ElectMasterResponseHeader{" +
- "newMasterAddress='" + newMasterAddress + '\'' +
+ "masterAddress='" + masterAddress + '\'' +
", masterEpoch=" + masterEpoch +
", syncStateSetEpoch=" + syncStateSetEpoch +
", brokerMemberGroup=" + brokerMemberGroup +
+ ", brokerId=" + brokerId +
'}';
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
index a901bfbe3..6c1777e36 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
@@ -75,7 +75,7 @@ public class ReElectMasterSubCommand implements SubCommand {
final ElectMasterResponseHeader metaData = defaultMQAdminExt.electMaster(controllerAddress, clusterName, brokerName, brokerAddress);
System.out.printf("\n#ClusterName\t%s", clusterName);
System.out.printf("\n#BrokerName\t%s", brokerName);
- System.out.printf("\n#BrokerMasterAddr\t%s", metaData.getNewMasterAddress());
+ System.out.printf("\n#BrokerMasterAddr\t%s", metaData.getMasterAddress());
System.out.printf("\n#MasterEpoch\t%s", metaData.getMasterEpoch());
System.out.printf("\n#SyncStateSetEpoch\t%s\n", metaData.getSyncStateSetEpoch());
BrokerMemberGroup brokerMemberGroup = metaData.getBrokerMemberGroup();