You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/16 12:45:02 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: Make thread pool in ReplicasManager can distinguish different brokers
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 2f6da9e77 Make thread pool in ReplicasManager can distinguish different brokers
2f6da9e77 is described below
commit 2f6da9e77b5dded080a746b8c10546cf457baf6d
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Mon May 16 20:44:33 2022 +0800
Make thread pool in ReplicasManager can distinguish different brokers
---
.../broker/hacontroller/ReplicasManager.java | 26 ++++++++++++----------
1 file changed, 14 insertions(+), 12 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 66722637b..fd09239ab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -45,21 +45,21 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
/**
- * The manager of broker replicas, including:
- * 0.regularly syncing controller metadata, change controller leader address, both master and slave will start this timed task.
- * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
- * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ * The manager of broker replicas, including: 0.regularly syncing controller metadata, change controller leader address,
+ * both master and slave will start this timed task. 1.regularly syncing metadata from controllers, and changing broker
+ * roles and master if needed, both master and slave will start this timed task. 2.regularly expanding and Shrinking
+ * syncStateSet, only master will start this timed task.
*/
public class ReplicasManager {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_"));
- private final ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("ReplicasManager_ExecutorService_"));
+ private final ScheduledExecutorService scheduledService;
+ private final ExecutorService executorService;
private final BrokerController brokerController;
private final AutoSwitchHAService haService;
private final BrokerConfig brokerConfig;
private final String localAddress;
- private final BrokerOuterAPI outerAPI;
+ private final BrokerOuterAPI brokerOuterAPI;
private final List<String> controllerAddresses;
private volatile String controllerLeaderAddress = "";
@@ -74,7 +74,9 @@ public class ReplicasManager {
public ReplicasManager(final BrokerController brokerController) {
this.brokerController = brokerController;
- this.outerAPI = brokerController.getBrokerOuterAPI();
+ this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
+ this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
+ this.executorService = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
@@ -210,7 +212,7 @@ public class ReplicasManager {
private boolean registerBroker() {
// Register this broker to controller, get brokerId and masterAddress.
try {
- final BrokerRegisterResponseHeader registerResponse = this.outerAPI.registerBroker(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress);
+ final BrokerRegisterResponseHeader registerResponse = this.brokerOuterAPI.registerBroker(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress);
final String newMasterAddress = registerResponse.getMasterAddress();
this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
if (StringUtils.isNoneEmpty(newMasterAddress)) {
@@ -233,7 +235,7 @@ public class ReplicasManager {
private void schedulingSyncBrokerMetadata() {
this.scheduledService.scheduleAtFixedRate(() -> {
try {
- final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result = this.outerAPI.getReplicaInfo(this.controllerLeaderAddress, this.brokerConfig.getBrokerName());
+ final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result = this.brokerOuterAPI.getReplicaInfo(this.controllerLeaderAddress, this.brokerConfig.getBrokerName());
final GetReplicaInfoResponseHeader info = result.getObject1();
final SyncStateSet syncStateSet = result.getObject2();
final String newMasterAddress = info.getMasterAddress();
@@ -283,7 +285,7 @@ public class ReplicasManager {
private boolean updateControllerMetadata() {
for (String address : this.controllerAddresses) {
try {
- final GetMetaDataResponseHeader responseHeader = this.outerAPI.getControllerMetaData(address);
+ final GetMetaDataResponseHeader responseHeader = this.brokerOuterAPI.getControllerMetaData(address);
if (responseHeader != null && responseHeader.isLeader()) {
this.controllerLeaderAddress = address;
LOGGER.info("Change controller leader address to {}", this.controllerAddresses);
@@ -311,7 +313,7 @@ public class ReplicasManager {
}
}
try {
- final SyncStateSet result = this.outerAPI.alterSyncStateSet(this.controllerLeaderAddress, this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch, newSyncStateSet, this.syncStateSetEpoch);
+ final SyncStateSet result = this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress, this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch, newSyncStateSet, this.syncStateSetEpoch);
if (result != null) {
changeSyncStateSet(result.getSyncStateSet(), result.getSyncStateSetEpoch());
}