You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/07 08:51:28 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Let controller inform broker that role changed. (#4424)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 98a283f5a [Summer of code] Let controller inform broker that role changed. (#4424)
98a283f5a is described below
commit 98a283f5ae2968f9d501155f9702e51a3e9bc214
Author: hzh0425 <64...@qq.com>
AuthorDate: Tue Jun 7 16:51:14 2022 +0800
[Summer of code] Let controller inform broker that role changed. (#4424)
* add broker api --notifyBrokerRoleChanged --
* add broker api --notifyBrokerRoleChanged --
* let controller inform broker when role changed
* code reivew
---
.../broker/hacontroller/ReplicasManager.java | 12 +++-
.../broker/processor/AdminBrokerProcessor.java | 21 ++++++
.../rocketmq/common/protocol/RequestCode.java | 2 +
.../NotifyBrokerRoleChangedRequestHeader.java | 83 ++++++++++++++++++++++
.../controller/ElectMasterResponseHeader.java | 24 ++++++-
.../rocketmq/controller/ControllerManager.java | 48 +++++++++++++
.../controller/impl/manager/BrokerInfo.java | 4 ++
.../impl/manager/ReplicasInfoManager.java | 17 +++++
.../namesrv/routeinfo/RouteInfoManager.java | 59 ++++++++++++++-
9 files changed, 265 insertions(+), 5 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index bb88b6c84..40ac5d1e2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -147,6 +147,16 @@ public class ReplicasManager {
this.scheduledService.shutdown();
}
+ public synchronized void changeBrokerRole(final String newMasterAddress, final int newMasterEpoch, final int syncStateSetEpoch, final long brokerId) {
+ if (StringUtils.isNoneEmpty(newMasterAddress) && newMasterEpoch > this.masterEpoch) {
+ if (StringUtils.equals(newMasterAddress, this.localAddress)) {
+ changeToMaster(newMasterEpoch, syncStateSetEpoch);
+ } else {
+ changeToSlave(newMasterAddress, newMasterEpoch, brokerId);
+ }
+ }
+ }
+
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
@@ -193,7 +203,7 @@ public class ReplicasManager {
public void changeToSlave(final String newMasterAddress, final int newMasterEpoch, long brokerId) {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
- LOGGER.info("Begin to change to slave, brokerName={}, replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getBrokerId());
+ LOGGER.info("Begin to change to slave, brokerName={}, replicas={}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, brokerId);
brokerController.getMessageStore().disableWrite();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 3aaf4b935..7f8b2a437 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -124,6 +124,7 @@ import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListReque
import org.apache.rocketmq.common.protocol.header.GetSubscriptionGroupConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
@@ -299,6 +300,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return this.resetMasterFlushOffset(ctx, request);
case RequestCode.GET_BROKER_EPOCH_CACHE:
return this.getBrokerEpochCache(ctx, request);
+ case RequestCode.NOTIFY_BROKER_ROLE_CHANGED:
+ return this.notifyBrokerRoleChanged(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
@@ -2390,4 +2393,22 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
response.setRemark(null);
return response;
}
+
+ private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ NotifyBrokerRoleChangedRequestHeader requestHeader = request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ LOGGER.info("Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", requestHeader);
+
+ final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
+ if (replicasManager != null) {
+ replicasManager.changeBrokerRole(requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), requestHeader.getBrokerId());
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ return response;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index b6313c583..a6c05f445 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -254,4 +254,6 @@ public class RequestCode {
public static final int CONTROLLER_GET_SYNC_STATE_DATA = 1006;
public static final int GET_BROKER_EPOCH_CACHE = 1007;
+
+ public static final int NOTIFY_BROKER_ROLE_CHANGED = 1008;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotifyBrokerRoleChangedRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotifyBrokerRoleChangedRequestHeader.java
new file mode 100644
index 000000000..33f159dd5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotifyBrokerRoleChangedRequestHeader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class NotifyBrokerRoleChangedRequestHeader implements CommandCustomHeader {
+ private String masterAddress;
+ private int masterEpoch;
+ private int syncStateSetEpoch;
+ // The id of this broker.
+ private long brokerId;
+
+ public NotifyBrokerRoleChangedRequestHeader() {
+ }
+
+ public NotifyBrokerRoleChangedRequestHeader(String masterAddress, int masterEpoch, int syncStateSetEpoch, long brokerId) {
+ this.masterAddress = masterAddress;
+ this.masterEpoch = masterEpoch;
+ this.syncStateSetEpoch = syncStateSetEpoch;
+ this.brokerId = brokerId;
+ }
+
+ public String getMasterAddress() {
+ return masterAddress;
+ }
+
+ public void setMasterAddress(String masterAddress) {
+ this.masterAddress = masterAddress;
+ }
+
+ public int getMasterEpoch() {
+ return masterEpoch;
+ }
+
+ public void setMasterEpoch(int masterEpoch) {
+ this.masterEpoch = masterEpoch;
+ }
+
+ public int getSyncStateSetEpoch() {
+ return syncStateSetEpoch;
+ }
+
+ public void setSyncStateSetEpoch(int syncStateSetEpoch) {
+ this.syncStateSetEpoch = syncStateSetEpoch;
+ }
+
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
+ @Override public String toString() {
+ return "NotifyBrokerRoleChangedRequestHeader{" +
+ "masterAddress='" + masterAddress + '\'' +
+ ", masterEpoch=" + masterEpoch +
+ ", syncStateSetEpoch=" + syncStateSetEpoch +
+ ", brokerId=" + brokerId +
+ '}';
+ }
+
+ @Override public void checkFields() throws RemotingCommandException {
+
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
index 1adcfe3b7..5dc02c421 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
@@ -16,12 +16,15 @@
*/
package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ElectMasterResponseHeader implements CommandCustomHeader {
private String newMasterAddress;
private int masterEpoch;
+ private int syncStateSetEpoch;
+ private BrokerMemberGroup brokerMemberGroup;
public ElectMasterResponseHeader() {
}
@@ -42,11 +45,28 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
this.masterEpoch = masterEpoch;
}
- @Override
- public String toString() {
+ public int getSyncStateSetEpoch() {
+ return syncStateSetEpoch;
+ }
+
+ public void setSyncStateSetEpoch(int syncStateSetEpoch) {
+ this.syncStateSetEpoch = syncStateSetEpoch;
+ }
+
+ public BrokerMemberGroup getBrokerMemberGroup() {
+ return brokerMemberGroup;
+ }
+
+ public void setBrokerMemberGroup(BrokerMemberGroup brokerMemberGroup) {
+ this.brokerMemberGroup = brokerMemberGroup;
+ }
+
+ @Override public String toString() {
return "ElectMasterResponseHeader{" +
"newMasterAddress='" + newMasterAddress + '\'' +
", masterEpoch=" + masterEpoch +
+ ", syncStateSetEpoch=" + syncStateSetEpoch +
+ ", brokerMember=" + brokerMemberGroup +
'}';
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 01f87903d..5f800a5a6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.controller;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -31,6 +32,8 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.namesrv.ControllerConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.controller.impl.DLedgerController;
@@ -38,8 +41,10 @@ import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -51,6 +56,7 @@ public class ControllerManager {
private final NettyClientConfig nettyClientConfig;
private final BrokerHousekeepingService brokerHousekeepingService;
private final Configuration configuration;
+ private final RemotingClient remotingClient;
private Controller controller;
private BrokerHeartbeatManager heartbeatManager;
private ExecutorService controllerRequestExecutor;
@@ -67,6 +73,7 @@ public class ControllerManager {
this.controllerConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
+ this.remotingClient = new NettyRemotingClient(nettyClientConfig);
}
public boolean initialize() {
@@ -102,6 +109,7 @@ public class ControllerManager {
if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
heartbeatManager.changeBrokerMetadata(clusterName, responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
}
+ notifyBrokerMasterChanged(responseHeader, clusterName);
}
} catch (Exception ignored) {
}
@@ -115,6 +123,44 @@ public class ControllerManager {
return true;
}
+ /**
+ * Notify master and all slaves for a broker that the master role changed.
+ */
+ public void notifyBrokerMasterChanged(final ElectMasterResponseHeader electMasterResult, final String clusterName) {
+ final BrokerMemberGroup memberGroup = electMasterResult.getBrokerMemberGroup();
+ if (memberGroup != null) {
+ // First, inform the master
+ final String master = electMasterResult.getNewMasterAddress();
+ if (StringUtils.isNoneEmpty(master) && this.heartbeatManager.isBrokerActive(clusterName, master)) {
+ doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, electMasterResult);
+ }
+
+ // Then, inform all slaves
+ final Map<Long, String> brokerIdAddrs = memberGroup.getBrokerAddrs();
+ for (Map.Entry<Long, String> broker : brokerIdAddrs.entrySet()) {
+ if (!broker.getValue().equals(master) && this.heartbeatManager.isBrokerActive(clusterName, broker.getValue())) {
+ doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), electMasterResult);
+ }
+ }
+
+ }
+ }
+
+ public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId,
+ final ElectMasterResponseHeader responseHeader) {
+ if (StringUtils.isNoneEmpty(brokerAddr)) {
+ log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
+ final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
+ responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
+ try {
+ this.remotingClient.invokeOneway(brokerAddr, request, 3000);
+ } catch (final Exception e) {
+ log.error("Failed to notify broker {} with id {} that role changed", brokerAddr, brokerId, e);
+ }
+ }
+ }
+
public void registerProcessor() {
final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
final RemotingServer controllerRemotingServer = this.controller.getRemotingServer();
@@ -131,12 +177,14 @@ public class ControllerManager {
public void start() {
this.heartbeatManager.start();
this.controller.startup();
+ this.remotingClient.start();
}
public void shutdown() {
this.heartbeatManager.shutdown();
this.controllerRequestExecutor.shutdown();
this.controller.shutdown();
+ this.remotingClient.shutdown();
}
public BrokerHeartbeatManager getHeartbeatManager() {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
index f819ff1b3..2c283c08c 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
@@ -62,6 +62,10 @@ public class BrokerInfo {
return new HashSet<>(this.brokerIdTable.keySet());
}
+ public HashMap<String, Long> getBrokerIdTable() {
+ return new HashMap<>(this.brokerIdTable);
+ }
+
public Long getBrokerId(final String address) {
if (this.brokerIdTable.containsKey(address)) {
return this.brokerIdTable.get(address);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index f0b680f73..eefb7feeb 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.ControllerConfig;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.common.protocol.body.InSyncStateData;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
@@ -194,11 +195,14 @@ public class ReplicasInfoManager {
private boolean tryElectMaster(final ControllerResult<ElectMasterResponseHeader> result, final String brokerName,
final Set<String> candidates, final Predicate<String> filter) {
final int masterEpoch = this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
+ final int syncStateSetEpoch = this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
for (final String candidate : candidates) {
if (filter.test(candidate)) {
final ElectMasterResponseHeader response = result.getResponse();
response.setNewMasterAddress(candidate);
response.setMasterEpoch(masterEpoch + 1);
+ response.setSyncStateSetEpoch(syncStateSetEpoch);
+ response.setBrokerMemberGroup(buildBrokerMemberGroup(brokerName));
final ElectMasterEvent event = new ElectMasterEvent(brokerName, candidate);
result.addEvent(event);
@@ -208,6 +212,19 @@ public class ReplicasInfoManager {
return false;
}
+ private BrokerMemberGroup buildBrokerMemberGroup(final String brokerName) {
+ if (isContainsBroker(brokerName)) {
+ final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final BrokerMemberGroup group = new BrokerMemberGroup(brokerInfo.getClusterName(), brokerName);
+ final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+ final HashMap<Long, String> memberGroup = new HashMap<>();
+ brokerIdTable.forEach((addr, id)->memberGroup.put(id, addr));
+ group.setBrokerAddrs(memberGroup);
+ return group;
+ }
+ return null;
+ }
+
public ControllerResult<BrokerRegisterResponseHeader> registerBroker(final BrokerRegisterRequestHeader request) {
final String brokerName = request.getBrokerName();
final String brokerAddress = request.getBrokerAddress();
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index f1e89f59c..4872bb87e 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -28,10 +28,13 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BrokerAddrInfo;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
@@ -46,9 +49,11 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -601,8 +606,21 @@ public class RouteInfoManager {
// Check whether we need to elect a new master
if (this.namesrvController != null && this.namesrvController.getControllerConfig().isEnableStartupController() && this.controller != null) {
- if (unRegisterRequest.getBrokerId() == 0) {
- this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
+ if (unRegisterRequest.getBrokerId() == MixAll.MASTER_ID) {
+ if (this.controller.isLeaderState()) {
+ final CompletableFuture<RemotingCommand> future = this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
+ try {
+ final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
+ final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
+ if (responseHeader != null) {
+ log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, responseHeader);
+ notifyBrokerMasterChanged(responseHeader, clusterName);
+ }
+ } catch (Exception ignored) {
+ }
+ } else {
+ log.info("Broker {}'s master shutdown");
+ }
}
}
}
@@ -910,6 +928,43 @@ public class RouteInfoManager {
}
}
+ /**
+ * Notify master and all slaves for a broker that the master role changed.
+ */
+ private void notifyBrokerMasterChanged(final ElectMasterResponseHeader electMasterResult, final String clusterName) {
+ final BrokerMemberGroup memberGroup = electMasterResult.getBrokerMemberGroup();
+ if (memberGroup != null) {
+ // First, inform the master
+ final String master = electMasterResult.getNewMasterAddress();
+ if (StringUtils.isNoneEmpty(master) && isBrokerAlive(clusterName, master)) {
+ doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, electMasterResult);
+ }
+
+ // Then, inform all slaves
+ final Map<Long, String> brokerIdAddrs = memberGroup.getBrokerAddrs();
+ for (Map.Entry<Long, String> broker : brokerIdAddrs.entrySet()) {
+ if (!broker.getValue().equals(master) && isBrokerAlive(clusterName, broker.getValue())) {
+ doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), electMasterResult);
+ }
+ }
+
+ }
+ }
+
+ private void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId, final ElectMasterResponseHeader responseHeader) {
+ if (StringUtils.isNoneEmpty(brokerAddr)) {
+ log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
+ final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
+ responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
+ try {
+ this.namesrvController.getRemotingClient().invokeOneway(brokerAddr, request, 3000);
+ } catch (final Exception e) {
+ log.error("Failed to notify broker {} with id {} that role changed", brokerAddr, brokerId, e);
+ }
+ }
+ }
+
private List<String> chooseBrokerAddrsToNotify(Map<Long, String> brokerAddrMap, String offlineBrokerAddr) {
if (offlineBrokerAddr != null || brokerAddrMap.size() == 1) {
// notify the reset brokers.