You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/07 02:13:47 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: Ensure that new messages cannot be written during failover (#4423)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new fa8ce658c Ensure that new messages cannot be written during failover (#4423)
fa8ce658c is described below

commit fa8ce658c5b0887b2fa87f08d0df3824acb7eb9f
Author: rongtong <ji...@163.com>
AuthorDate: Tue Jun 7 10:13:29 2022 +0800

    Ensure that new messages cannot be written during failover (#4423)
---
 .../apache/rocketmq/broker/hacontroller/ReplicasManager.java   |  8 ++++++++
 .../rocketmq/broker/plugin/AbstractPluginMessageStore.java     |  8 ++++++++
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java    | 10 ++++++++++
 .../src/main/java/org/apache/rocketmq/store/MessageStore.java  | 10 ++++++++++
 4 files changed, 36 insertions(+)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index a25924e84..bb88b6c84 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -152,6 +152,8 @@ public class ReplicasManager {
             if (newMasterEpoch > this.masterEpoch) {
                 LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.localAddress, newMasterEpoch);
 
+                brokerController.getMessageStore().disableWrite();
+
                 // Change record
                 this.masterAddress = this.localAddress;
                 this.masterEpoch = newMasterEpoch;
@@ -172,6 +174,8 @@ public class ReplicasManager {
                 // Notify ha service, change to master
                 this.haService.changeToMaster(newMasterEpoch);
 
+                brokerController.getMessageStore().enableWrite();
+
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
                     try {
@@ -191,6 +195,8 @@ public class ReplicasManager {
             if (newMasterEpoch > this.masterEpoch) {
                 LOGGER.info("Begin to change to slave, brokerName={}, replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getBrokerId());
 
+                brokerController.getMessageStore().disableWrite();
+
                 // Change record
                 this.masterAddress = newMasterAddress;
                 this.masterEpoch = newMasterEpoch;
@@ -208,6 +214,8 @@ public class ReplicasManager {
                 // Notify ha service, change to slave
                 this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
 
+                brokerController.getMessageStore().enableWrite();
+
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
                     try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 42542210e..74680979c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -531,4 +531,12 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
     @Override public boolean isShutdown() {
         return next.isShutdown();
     }
+
+    @Override public void disableWrite() {
+        next.disableWrite();
+    }
+
+    @Override public void enableWrite() {
+        next.enableWrite();
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 17561acb3..b08bc7574 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1675,6 +1675,16 @@ public class DefaultMessageStore implements MessageStore {
         return runningFlags;
     }
 
+    @Override
+    public void disableWrite() {
+        runningFlags.getAndMakeNotWriteable();
+    }
+
+    @Override
+    public void enableWrite() {
+        runningFlags.getAndMakeWriteable();
+    }
+
     public void doDispatch(DispatchRequest req) {
         for (CommitLogDispatcher dispatcher : this.dispatcherList) {
             dispatcher.dispatch(req);
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 9dc27faff..b0ca83979 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -796,4 +796,14 @@ public interface MessageStore {
      * @return whether shutdown
      */
     boolean isShutdown();
+
+    /*
+     * Make MessageStore not writeable, default is writeable
+     */
+    void disableWrite();
+
+    /*
+     * Make MessageStore not writeable, default is writeable
+     */
+    void enableWrite();
 }