You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/07 02:13:47 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: Ensure that new messages cannot be written during failover (#4423)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new fa8ce658c Ensure that new messages cannot be written during failover (#4423)
fa8ce658c is described below
commit fa8ce658c5b0887b2fa87f08d0df3824acb7eb9f
Author: rongtong <ji...@163.com>
AuthorDate: Tue Jun 7 10:13:29 2022 +0800
Ensure that new messages cannot be written during failover (#4423)
---
.../apache/rocketmq/broker/hacontroller/ReplicasManager.java | 8 ++++++++
.../rocketmq/broker/plugin/AbstractPluginMessageStore.java | 8 ++++++++
.../java/org/apache/rocketmq/store/DefaultMessageStore.java | 10 ++++++++++
.../src/main/java/org/apache/rocketmq/store/MessageStore.java | 10 ++++++++++
4 files changed, 36 insertions(+)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index a25924e84..bb88b6c84 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -152,6 +152,8 @@ public class ReplicasManager {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.localAddress, newMasterEpoch);
+ brokerController.getMessageStore().disableWrite();
+
// Change record
this.masterAddress = this.localAddress;
this.masterEpoch = newMasterEpoch;
@@ -172,6 +174,8 @@ public class ReplicasManager {
// Notify ha service, change to master
this.haService.changeToMaster(newMasterEpoch);
+ brokerController.getMessageStore().enableWrite();
+
this.executorService.submit(() -> {
// Register broker to name-srv
try {
@@ -191,6 +195,8 @@ public class ReplicasManager {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to slave, brokerName={}, replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getBrokerId());
+ brokerController.getMessageStore().disableWrite();
+
// Change record
this.masterAddress = newMasterAddress;
this.masterEpoch = newMasterEpoch;
@@ -208,6 +214,8 @@ public class ReplicasManager {
// Notify ha service, change to slave
this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
+ brokerController.getMessageStore().enableWrite();
+
this.executorService.submit(() -> {
// Register broker to name-srv
try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 42542210e..74680979c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -531,4 +531,12 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
@Override public boolean isShutdown() {
return next.isShutdown();
}
+
+ @Override public void disableWrite() {
+ next.disableWrite();
+ }
+
+ @Override public void enableWrite() {
+ next.enableWrite();
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 17561acb3..b08bc7574 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1675,6 +1675,16 @@ public class DefaultMessageStore implements MessageStore {
return runningFlags;
}
+ @Override
+ public void disableWrite() {
+ runningFlags.getAndMakeNotWriteable();
+ }
+
+ @Override
+ public void enableWrite() {
+ runningFlags.getAndMakeWriteable();
+ }
+
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 9dc27faff..b0ca83979 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -796,4 +796,14 @@ public interface MessageStore {
* @return whether shutdown
*/
boolean isShutdown();
+
+ /*
+ * Make MessageStore not writeable, default is writeable
+ */
+ void disableWrite();
+
+ /*
+ * Make MessageStore not writeable, default is writeable
+ */
+ void enableWrite();
}