You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/31 06:55:27 UTC
[rocketmq] branch develop updated: [ISSUE #6508] Prohibit writing and reading before confirming the broker role in ha mode (#6509)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 526a10755 [ISSUE #6508] Prohibit writing and reading before confirming the broker role in ha mode (#6509)
526a10755 is described below
commit 526a107557a25b6e6101c247a1d9dbe06e3ff074
Author: fujian-zfj <25...@qq.com>
AuthorDate: Fri Mar 31 14:55:19 2023 +0800
[ISSUE #6508] Prohibit writing and reading before confirming the broker role in ha mode (#6509)
* typo int readme[ecosystem]
* disable RW before confirming broker role
* fix comment
* set isolated and brokerPermission when broker role is confirmed
* set isolated and brokerPermission when broker role is confirmed
* remove unused import
* unified in replicasManager
* Add volatile to originalBrokerPermission var
* Remove useless originalBrokerPermission
---------
Co-authored-by: RongtongJin <ji...@mails.ucas.ac.cn>
---
.../apache/rocketmq/broker/BrokerController.java | 7 +++--
.../broker/controller/ReplicasManager.java | 32 ++++++++++++++++------
2 files changed, 28 insertions(+), 11 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9a92daf09..70e59e098 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1553,10 +1553,14 @@ public class BrokerController {
this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
- if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {
+ if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
isIsolated = true;
}
+ if (this.brokerConfig.isEnableControllerMode()) {
+ this.replicasManager.setIsolatedAndBrokerPermission(false);
+ }
+
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
@@ -2301,5 +2305,4 @@ public class BrokerController {
public BlockingQueue<Runnable> getAdminBrokerThreadPoolQueue() {
return adminBrokerThreadPoolQueue;
}
-
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 5bdd1dbe9..c39e33ad1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -93,6 +93,8 @@ public class ReplicasManager {
private Long masterBrokerId;
+ private volatile int originalBrokerPermission = 0;
+
private BrokerMetadata brokerMetadata;
private TempBrokerMetadata tempBrokerMetadata;
@@ -205,7 +207,7 @@ public class ReplicasManager {
if (this.masterBrokerId != null || brokerElect()) {
LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterBrokerId, this.masterAddress);
this.state = State.RUNNING;
- this.brokerController.setIsolated(false);
+ setIsolatedAndBrokerPermission(true);
LOGGER.info("All register process has been done, change state to: {}", this.state);
} else {
return false;
@@ -261,10 +263,6 @@ public class ReplicasManager {
final HashSet<Long> newSyncStateSet = new HashSet<>(syncStateSet);
changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
- // Change record
- this.masterAddress = this.brokerAddress;
- this.masterBrokerId = this.brokerControllerId;
-
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
@@ -275,6 +273,10 @@ public class ReplicasManager {
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
this.brokerController.changeSpecialServiceStatus(true);
+ // Change record
+ this.masterAddress = this.brokerAddress;
+ this.masterBrokerId = this.brokerControllerId;
+
schedulingCheckSyncStateSet();
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
@@ -299,10 +301,6 @@ public class ReplicasManager {
return;
}
- // Change record
- this.masterAddress = newMasterAddress;
- this.masterBrokerId = newMasterBrokerId;
-
// Stop checking syncStateSet because only master is able to check
stopCheckSyncStateSet();
@@ -312,6 +310,10 @@ public class ReplicasManager {
// The brokerId in brokerConfig just means its role(master[0] or slave[>=1])
this.brokerConfig.setBrokerId(brokerControllerId);
+ // Change record
+ this.masterAddress = newMasterAddress;
+ this.masterBrokerId = newMasterBrokerId;
+
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
@@ -872,4 +874,16 @@ public class ReplicasManager {
public TempBrokerMetadata getTempBrokerMetadata() {
return tempBrokerMetadata;
}
+
+ public void setIsolatedAndBrokerPermission(boolean isBrokerRoleConfirmed) {
+ if (isBrokerRoleConfirmed) {
+ this.brokerController.setIsolated(false);
+ this.brokerConfig.setBrokerPermission(this.originalBrokerPermission);
+ } else {
+ // prohibit writing and reading before confirming the broker role
+ this.brokerController.setIsolated(true);
+ this.originalBrokerPermission = this.brokerConfig.getBrokerPermission();
+ this.brokerConfig.setBrokerPermission(0);
+ }
+ }
}