You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/16 12:45:02 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: Make thread pool in ReplicasManager can distinguish different brokers

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 2f6da9e77 Make thread pool in ReplicasManager can distinguish different brokers
2f6da9e77 is described below

commit 2f6da9e77b5dded080a746b8c10546cf457baf6d
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Mon May 16 20:44:33 2022 +0800

    Make thread pool in ReplicasManager can distinguish different brokers
---
 .../broker/hacontroller/ReplicasManager.java       | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 66722637b..fd09239ab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -45,21 +45,21 @@ import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 
 /**
- * The manager of broker replicas, including:
- * 0.regularly syncing controller metadata, change controller leader address, both master and slave will start this timed task.
- * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
- * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ * The manager of broker replicas, including: 0.regularly syncing controller metadata, change controller leader address,
+ * both master and slave will start this timed task. 1.regularly syncing metadata from controllers, and changing broker
+ * roles and master if needed, both master and slave will start this timed task. 2.regularly expanding and Shrinking
+ * syncStateSet, only master will start this timed task.
  */
 public class ReplicasManager {
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
-    private final ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_"));
-    private final ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("ReplicasManager_ExecutorService_"));
+    private final ScheduledExecutorService scheduledService;
+    private final ExecutorService executorService;
     private final BrokerController brokerController;
     private final AutoSwitchHAService haService;
     private final BrokerConfig brokerConfig;
     private final String localAddress;
-    private final BrokerOuterAPI outerAPI;
+    private final BrokerOuterAPI brokerOuterAPI;
     private final List<String> controllerAddresses;
 
     private volatile String controllerLeaderAddress = "";
@@ -74,7 +74,9 @@ public class ReplicasManager {
 
     public ReplicasManager(final BrokerController brokerController) {
         this.brokerController = brokerController;
-        this.outerAPI = brokerController.getBrokerOuterAPI();
+        this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
+        this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
+        this.executorService = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
         this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
         this.brokerConfig = brokerController.getBrokerConfig();
         final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
@@ -210,7 +212,7 @@ public class ReplicasManager {
     private boolean registerBroker() {
         // Register this broker to controller, get brokerId and masterAddress.
         try {
-            final BrokerRegisterResponseHeader registerResponse = this.outerAPI.registerBroker(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress);
+            final BrokerRegisterResponseHeader registerResponse = this.brokerOuterAPI.registerBroker(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress);
             final String newMasterAddress = registerResponse.getMasterAddress();
             this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
             if (StringUtils.isNoneEmpty(newMasterAddress)) {
@@ -233,7 +235,7 @@ public class ReplicasManager {
     private void schedulingSyncBrokerMetadata() {
         this.scheduledService.scheduleAtFixedRate(() -> {
             try {
-                final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result = this.outerAPI.getReplicaInfo(this.controllerLeaderAddress, this.brokerConfig.getBrokerName());
+                final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result = this.brokerOuterAPI.getReplicaInfo(this.controllerLeaderAddress, this.brokerConfig.getBrokerName());
                 final GetReplicaInfoResponseHeader info = result.getObject1();
                 final SyncStateSet syncStateSet = result.getObject2();
                 final String newMasterAddress = info.getMasterAddress();
@@ -283,7 +285,7 @@ public class ReplicasManager {
     private boolean updateControllerMetadata() {
         for (String address : this.controllerAddresses) {
             try {
-                final GetMetaDataResponseHeader responseHeader = this.outerAPI.getControllerMetaData(address);
+                final GetMetaDataResponseHeader responseHeader = this.brokerOuterAPI.getControllerMetaData(address);
                 if (responseHeader != null && responseHeader.isLeader()) {
                     this.controllerLeaderAddress = address;
                     LOGGER.info("Change controller leader address to {}", this.controllerAddresses);
@@ -311,7 +313,7 @@ public class ReplicasManager {
                 }
             }
             try {
-                final SyncStateSet result = this.outerAPI.alterSyncStateSet(this.controllerLeaderAddress, this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch, newSyncStateSet, this.syncStateSetEpoch);
+                final SyncStateSet result = this.brokerOuterAPI.alterSyncStateSet(this.controllerLeaderAddress, this.brokerConfig.getBrokerName(), this.masterAddress, this.masterEpoch, newSyncStateSet, this.syncStateSetEpoch);
                 if (result != null) {
                     changeSyncStateSet(result.getSyncStateSet(), result.getSyncStateSetEpoch());
                 }