You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/31 06:55:27 UTC

[rocketmq] branch develop updated: [ISSUE #6508] Prohibit writing and reading before confirming the broker role in ha mode (#6509)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 526a10755 [ISSUE #6508] Prohibit writing and reading before confirming the broker role in ha mode (#6509)
526a10755 is described below

commit 526a107557a25b6e6101c247a1d9dbe06e3ff074
Author: fujian-zfj <25...@qq.com>
AuthorDate: Fri Mar 31 14:55:19 2023 +0800

    [ISSUE #6508] Prohibit writing and reading before confirming the broker role in ha mode (#6509)
    
    * typo int readme[ecosystem]
    
    * disable RW before confirming broker role
    
    * fix comment
    
    * set isolated and brokerPermission when broker role is confirmed
    
    * set isolated and brokerPermission when broker role is confirmed
    
    * remove unused import
    
    * unified in replicasManager
    
    * Add volatile to originalBrokerPermission var
    
    * Remove useless originalBrokerPermission
    
    ---------
    
    Co-authored-by: RongtongJin <ji...@mails.ucas.ac.cn>
---
 .../apache/rocketmq/broker/BrokerController.java   |  7 +++--
 .../broker/controller/ReplicasManager.java         | 32 ++++++++++++++++------
 2 files changed, 28 insertions(+), 11 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9a92daf09..70e59e098 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1553,10 +1553,14 @@ public class BrokerController {
 
         this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
 
-        if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {
+        if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
             isIsolated = true;
         }
 
+        if (this.brokerConfig.isEnableControllerMode()) {
+            this.replicasManager.setIsolatedAndBrokerPermission(false);
+        }
+
         if (this.brokerOuterAPI != null) {
             this.brokerOuterAPI.start();
         }
@@ -2301,5 +2305,4 @@ public class BrokerController {
     public BlockingQueue<Runnable> getAdminBrokerThreadPoolQueue() {
         return adminBrokerThreadPoolQueue;
     }
-
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 5bdd1dbe9..c39e33ad1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -93,6 +93,8 @@ public class ReplicasManager {
 
     private Long masterBrokerId;
 
+    private volatile int originalBrokerPermission = 0;
+
     private BrokerMetadata brokerMetadata;
 
     private TempBrokerMetadata tempBrokerMetadata;
@@ -205,7 +207,7 @@ public class ReplicasManager {
             if (this.masterBrokerId != null || brokerElect()) {
                 LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterBrokerId, this.masterAddress);
                 this.state = State.RUNNING;
-                this.brokerController.setIsolated(false);
+                setIsolatedAndBrokerPermission(true);
                 LOGGER.info("All register process has been done, change state to: {}", this.state);
             } else {
                 return false;
@@ -261,10 +263,6 @@ public class ReplicasManager {
                 final HashSet<Long> newSyncStateSet = new HashSet<>(syncStateSet);
                 changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
 
-                // Change record
-                this.masterAddress = this.brokerAddress;
-                this.masterBrokerId = this.brokerControllerId;
-
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
 
@@ -275,6 +273,10 @@ public class ReplicasManager {
                 this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
                 this.brokerController.changeSpecialServiceStatus(true);
 
+                // Change record
+                this.masterAddress = this.brokerAddress;
+                this.masterBrokerId = this.brokerControllerId;
+
                 schedulingCheckSyncStateSet();
 
                 this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
@@ -299,10 +301,6 @@ public class ReplicasManager {
                     return;
                 }
 
-                // Change record
-                this.masterAddress = newMasterAddress;
-                this.masterBrokerId = newMasterBrokerId;
-
                 // Stop checking syncStateSet because only master is able to check
                 stopCheckSyncStateSet();
 
@@ -312,6 +310,10 @@ public class ReplicasManager {
                 // The brokerId in brokerConfig just means its role(master[0] or slave[>=1])
                 this.brokerConfig.setBrokerId(brokerControllerId);
 
+                // Change record
+                this.masterAddress = newMasterAddress;
+                this.masterBrokerId = newMasterBrokerId;
+
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SLAVE);
 
@@ -872,4 +874,16 @@ public class ReplicasManager {
     public TempBrokerMetadata getTempBrokerMetadata() {
         return tempBrokerMetadata;
     }
+
+    public void setIsolatedAndBrokerPermission(boolean isBrokerRoleConfirmed) {
+        if (isBrokerRoleConfirmed) {
+            this.brokerController.setIsolated(false);
+            this.brokerConfig.setBrokerPermission(this.originalBrokerPermission);
+        } else {
+            // prohibit writing and reading before confirming the broker role
+            this.brokerController.setIsolated(true);
+            this.originalBrokerPermission = this.brokerConfig.getBrokerPermission();
+            this.brokerConfig.setBrokerPermission(0);
+        }
+    }
 }