You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/11/24 04:59:53 UTC
[rocketmq] branch develop updated: [ISSUE #5569]Support broker priority election for controller mode (#5548)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 93e82c820 [ISSUE #5569]Support broker priority election for controller mode (#5548)
93e82c820 is described below
commit 93e82c82052691cf73610cb72fc91cc1f648417c
Author: hzh0425 <64...@qq.com>
AuthorDate: Thu Nov 24 12:59:34 2022 +0800
[ISSUE #5569]Support broker priority election for controller mode (#5548)
* Support broker priority election for controller mode
* fix comment of brokerElectionPriority
* Set "The lower the value of brokerElectionPriority, the higher the priority of the broker being selected as the master."
---
.../broker/controller/ReplicasManager.java | 2 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 4 +--
.../broker/controller/ReplicasManagerTest.java | 2 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 15 ++++++++
.../controller/BrokerHeartbeatManager.java | 2 +-
.../apache/rocketmq/controller/BrokerLiveInfo.java | 16 +++++++--
.../controller/elect/impl/DefaultElectPolicy.java | 12 +++++--
.../impl/DefaultBrokerHeartbeatManager.java | 7 ++--
.../processor/ControllerRequestProcessor.java | 3 +-
.../impl/DefaultBrokerHeartbeatManagerTest.java | 2 +-
.../impl/manager/ReplicasInfoManagerTest.java | 41 +++++++++++++++++-----
.../RegisterBrokerToControllerRequestHeader.java | 23 +++++++++---
12 files changed, 101 insertions(+), 28 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 5eceab0bb..a6589d2ea 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -292,7 +292,7 @@ public class ReplicasManager {
try {
final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress,
- this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset());
+ this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
final String newMasterAddress = registerResponse.getMasterAddress();
if (StringUtils.isNoneEmpty(newMasterAddress)) {
if (StringUtils.equals(newMasterAddress, this.localAddress)) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index f3ffc9293..a6853350e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -1161,9 +1161,9 @@ public class BrokerOuterAPI {
*/
public RegisterBrokerToControllerResponseHeader registerBrokerToController(
final String controllerAddress, final String clusterName,
- final String brokerName, final String address, final int epoch, final long maxOffset) throws Exception {
+ final String brokerName, final String address, final int epoch, final long maxOffset, final int electionPriority) throws Exception {
- final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, epoch, maxOffset);
+ final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, epoch, maxOffset, electionPriority);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 9c08d8e67..84e578db5 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -123,7 +123,7 @@ public class ReplicasManagerTest {
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
- when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong())).thenReturn(registerBrokerToControllerResponseHeader);
+ when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
replicasManager = new ReplicasManager(brokerController);
autoSwitchHAService.init(defaultMessageStore);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 92f28d653..ec5986818 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -336,6 +336,13 @@ public class BrokerConfig extends BrokerIdentity {
private long syncControllerMetadataPeriod = 10 * 1000;
+ /**
+ * It is an important basis for the controller to choose the broker master.
+ * The lower the value of brokerElectionPriority, the higher the priority of the broker being selected as the master.
+ * You can set a lower priority for the broker with better machine conditions.
+ */
+ private int brokerElectionPriority = Integer.MAX_VALUE;
+
public enum MetricsExporterType {
DISABLE(0),
OTLP_GRPC(1),
@@ -1422,6 +1429,14 @@ public class BrokerConfig extends BrokerIdentity {
this.syncControllerMetadataPeriod = syncControllerMetadataPeriod;
}
+ public int getBrokerElectionPriority() {
+ return brokerElectionPriority;
+ }
+
+ public void setBrokerElectionPriority(int brokerElectionPriority) {
+ this.brokerElectionPriority = brokerElectionPriority;
+ }
+
public boolean isRecoverConcurrently() {
return recoverConcurrently;
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 364b32647..fd41aa21a 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -49,7 +49,7 @@ public interface BrokerHeartbeatManager {
* Register new broker to heartManager.
*/
void registerBroker(final String clusterName, final String brokerName, final String brokerAddr, final long brokerId,
- final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset);
+ final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset, final Integer electionPriority);
/**
* Broker channel close
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
index e88b26c39..faaf298d2 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
@@ -31,9 +31,10 @@ public class BrokerLiveInfo {
private int epoch;
private long maxOffset;
private long confirmOffset;
+ private Integer electionPriority;
public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
- Channel channel, int epoch, long maxOffset) {
+ Channel channel, int epoch, long maxOffset, Integer electionPriority) {
this.brokerName = brokerName;
this.brokerAddr = brokerAddr;
this.brokerId = brokerId;
@@ -41,10 +42,12 @@ public class BrokerLiveInfo {
this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
this.channel = channel;
this.epoch = epoch;
+ this.electionPriority = electionPriority;
this.maxOffset = maxOffset;
}
+
public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
- Channel channel, int epoch, long maxOffset, long confirmOffset) {
+ Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
this.brokerName = brokerName;
this.brokerAddr = brokerAddr;
this.brokerId = brokerId;
@@ -53,6 +56,7 @@ public class BrokerLiveInfo {
this.channel = channel;
this.epoch = epoch;
this.maxOffset = maxOffset;
+ this.electionPriority = electionPriority;
this.confirmOffset = confirmOffset;
}
@@ -123,6 +127,14 @@ public class BrokerLiveInfo {
this.confirmOffset = confirmOffset;
}
+ public void setElectionPriority(Integer electionPriority) {
+ this.electionPriority = electionPriority;
+ }
+
+ public Integer getElectionPriority() {
+ return electionPriority;
+ }
+
public long getConfirmOffset() {
return confirmOffset;
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
index 7af029b98..00cac1627 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
@@ -36,8 +36,14 @@ public class DefaultElectPolicy implements ElectPolicy {
// <clusterName, brokerAddr, BrokerLiveInfo>, Used to obtain the BrokerLiveInfo information of a broker
private BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter;
- private final Comparator<BrokerLiveInfo> comparator = (x, y) -> {
- return x.getEpoch() == y.getEpoch() ? (int) (y.getMaxOffset() - x.getMaxOffset()) : y.getEpoch() - x.getEpoch();
+ // Sort in descending order according to<epoch, offset>, and sort in ascending order according to priority
+ private final Comparator<BrokerLiveInfo> comparator = (o1, o2) -> {
+ if (o1.getEpoch() == o2.getEpoch()) {
+ return o1.getMaxOffset() == o2.getMaxOffset() ? o1.getElectionPriority() - o2.getElectionPriority() :
+ (int) (o2.getMaxOffset() - o1.getMaxOffset());
+ } else {
+ return o2.getEpoch() - o1.getEpoch();
+ }
};
public DefaultElectPolicy(BiPredicate<String, String> validPredicate, BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter) {
@@ -55,7 +61,7 @@ public class DefaultElectPolicy implements ElectPolicy {
* - Filter alive brokers by 'validPredicate'.
* - Check whether the old master is still valid.
* - If preferBrokerAddr is not empty and valid, select it as master.
- * - Otherwise, we will sort the array of 'brokerLiveInfo' according to (epoch, offset), and select the best candidate as the new master.
+ * - Otherwise, we will sort the array of 'brokerLiveInfo' according to (epoch, offset, electionPriority), and select the best candidate as the new master.
*
* @param clusterName the brokerGroup belongs
* @param syncStateBrokers all broker replicas in syncStateSet
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 94a43fa26..eabae152b 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -101,7 +101,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
@Override
public void registerBroker(String clusterName, String brokerName, String brokerAddr,
- long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset) {
+ long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, final Integer electionPriority) {
final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo,
new BrokerLiveInfo(brokerName,
@@ -109,7 +109,10 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
brokerId,
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
- channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 : maxOffset));
+ channel,
+ epoch == null ? -1 : epoch,
+ maxOffset == null ? -1 : maxOffset,
+ electionPriority == null ? Integer.MAX_VALUE : electionPriority));
if (prevBrokerLiveInfo == null) {
log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId);
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 04caec015..4cbc1140e 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -114,7 +114,8 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
- responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(), controllerRequest.getEpoch(), controllerRequest.getMaxOffset());
+ responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
+ controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getElectionPriority());
}
return response;
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index b8a151ea0..dd0c60a65 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -43,7 +43,7 @@ public class DefaultBrokerHeartbeatManagerTest {
this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerAddress, brokerId) -> {
latch.countDown();
});
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null, 1, 1L);
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null, 1, 1L, 0);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
this.heartbeatManager.shutdown();
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 0b7faafc0..270f98089 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -126,29 +126,38 @@ public class ReplicasInfoManagerTest {
public void mockHeartbeatDataMasterStillAlive() {
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L, 0);
}
public void mockHeartbeatDataHigherEpoch() {
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L);
+ 1, 3L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 0, 3L);
+ 0, 3L, 0);
}
public void mockHeartbeatDataHigherOffset() {
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L);
+ 1, 3L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L, 0);
+ }
+
+ public void mockHeartbeatDataHigherPriority() {
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+ 1, 3L, 3);
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 3L, 2);
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+ 1, 3L, 1);
}
@Test
@@ -190,6 +199,20 @@ public class ReplicasInfoManagerTest {
assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
}
+ @Test
+ public void testElectMasterPreferHigherPriorityWhenEpochAndOffsetEquals() {
+ mockMetaData();
+ final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
+ mockHeartbeatDataHigherPriority();
+ final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
+ electPolicy);
+ final ElectMasterResponseHeader response = cResult.getResponse();
+ assertEquals(response.getMasterEpoch(), 2);
+ assertFalse(response.getNewMasterAddress().isEmpty());
+ assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+ }
+
@Test
public void testElectMaster() {
mockMetaData();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
index 81ed03d48..a8e745e4f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
@@ -30,23 +30,27 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
private Long maxOffset;
@CFNullable
private Long heartbeatTimeoutMillis;
-
+ @CFNullable
+ private Integer electionPriority;
public RegisterBrokerToControllerRequestHeader() {
}
public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress) {
- this.clusterName = clusterName;
- this.brokerName = brokerName;
- this.brokerAddress = brokerAddress;
+ this(clusterName, brokerName, brokerAddress, 0);
}
- public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int epoch, long maxOffset) {
+ public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int electionPriority) {
+ this(clusterName, brokerName, brokerAddress, 0, 0, electionPriority);
+ }
+
+ public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int epoch, long maxOffset, int electionPriority) {
this.clusterName = clusterName;
this.brokerName = brokerName;
this.brokerAddress = brokerAddress;
this.epoch = epoch;
this.maxOffset = maxOffset;
+ this.electionPriority = electionPriority;
}
public String getClusterName() {
@@ -81,6 +85,14 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
}
+ public Integer getElectionPriority() {
+ return electionPriority;
+ }
+
+ public void setElectionPriority(Integer electionPriority) {
+ this.electionPriority = electionPriority;
+ }
+
@Override
public String toString() {
return "RegisterBrokerToControllerRequestHeader{" +
@@ -90,6 +102,7 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
", epoch=" + epoch +
", maxOffset=" + maxOffset +
", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+ ", electionPriority=" + electionPriority +
'}';
}