You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/07 01:14:30 UTC

[rocketmq] 04/14: feat(broker): implement the general register to controller protocol

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 468a5d85dd9a6762e63a6fcb06aafac472cbfa40
Author: TheR1sing3un <th...@163.com>
AuthorDate: Sun Feb 5 13:11:51 2023 +0800

    feat(broker): implement the general register to controller protocol
    
    1. implement the general register to controller protocol
---
 .../broker/controller/ReplicasManager.java         | 182 ++++++++++++++++++++-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  40 +++++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   4 +-
 .../rocketmq/controller/ControllerManager.java     |   3 +
 .../processor/ControllerRequestProcessor.java      |   9 +
 .../rocketmq/remoting/protocol/RequestCode.java    |   6 +
 .../rocketmq/remoting/protocol/ResponseCode.java   |   2 +
 .../register/ApplyBrokerIdRequestHeader.java       |   7 +
 .../register/ApplyBrokerIdResponseHeader.java      |  10 +-
 .../register/GetNextBrokerIdResponseHeader.java    |   4 +
 ...ader.java => RegisterSuccessRequestHeader.java} |  13 +-
 ...der.java => RegisterSuccessResponseHeader.java} |  11 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  20 +++
 .../store/ha/autoswitch/BrokerMetadata.java        |  82 ++++++++++
 .../rocketmq/store/ha/autoswitch/MetadataFile.java |  60 +++++++
 .../store/ha/autoswitch/TempBrokerMetadata.java    |  92 +++++++++++
 16 files changed, 519 insertions(+), 26 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 8a03015f7..e3c9382f8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -47,9 +47,14 @@ import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata;
+import org.apache.rocketmq.store.ha.autoswitch.TempBrokerMetadata;
 
 import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
 
@@ -78,6 +83,8 @@ public class ReplicasManager {
     private volatile String controllerLeaderAddress = "";
     private volatile State state = State.INITIAL;
 
+    private RegisterState registerState = RegisterState.INITIAL;
+
     private ScheduledFuture<?> checkSyncStateSetTaskFuture;
     private ScheduledFuture<?> slaveSyncFuture;
 
@@ -85,6 +92,10 @@ public class ReplicasManager {
 
     private Long masterBrokerId;
 
+    private BrokerMetadata brokerMetadata;
+
+    private TempBrokerMetadata tempBrokerMetadata;
+
     private Set<Long> syncStateSet;
     private int syncStateSetEpoch = 0;
     private String masterAddress = "";
@@ -103,6 +114,8 @@ public class ReplicasManager {
         this.availableControllerAddresses = new ConcurrentHashMap<>();
         this.syncStateSet = new HashSet<>();
         this.localAddress = brokerController.getBrokerAddr();
+        this.brokerMetadata = new BrokerMetadata(this.brokerController.getMessageStoreConfig().getStorePathMetadata());
+        this.tempBrokerMetadata = new TempBrokerMetadata(this.brokerController.getMessageStoreConfig().getStorePathTempMetadata());
     }
 
     public long getConfirmOffset() {
@@ -113,12 +126,22 @@ public class ReplicasManager {
         INITIAL,
         FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
 
-        FIRST_TIME_WAIT_MASTER_IS_ELECTED,
+        FIRST_TIME_REGISTER_TO_CONTROLLER_DONE,
 
         RUNNING,
         SHUTDOWN,
     }
 
+    enum RegisterState {
+        INITIAL,
+
+        CREATE_TEMP_METADATA_FILE_DONE,
+
+        CREATE_METADATA_FILE_DONE,
+
+        REGISTERED
+    }
+
     public void start() {
         updateControllerAddr();
         scanAvailableControllerAddresses();
@@ -157,13 +180,13 @@ public class ReplicasManager {
         if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
             if (registerBrokerToController()) {
                 LOGGER.info("First time register broker success");
-                this.state = State.FIRST_TIME_WAIT_MASTER_IS_ELECTED;
+                this.state = State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE;
             } else {
                 return false;
             }
         }
 
-        if (this.state == State.FIRST_TIME_WAIT_MASTER_IS_ELECTED) {
+        if (this.state == State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE) {
             if (StringUtils.isNotEmpty(this.masterAddress) || brokerElect()) {
                 LOGGER.info("Master in this broker set is elected");
                 this.state = State.RUNNING;
@@ -343,7 +366,7 @@ public class ReplicasManager {
     }
 
     private boolean registerBrokerToController() {
-        // Register this broker to controller, get brokerId and masterAddress.
+        // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file.
         try {
             final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
                 this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
@@ -373,6 +396,157 @@ public class ReplicasManager {
         }
     }
 
+    /**
+     * Register broker to controller, and persist the metadata to file
+     * @return whether registering process succeeded
+     */
+    private boolean registerBrokerToController2() {
+        try {
+            // 1. confirm now registering state
+            confirmNowRegisteringState();
+            // 2. get next assigning brokerId, and create temp metadata file
+            if (this.registerState == RegisterState.INITIAL) {
+                Long nextBrokerId = getNextBrokerId();
+                if (nextBrokerId == null || !createTempMetadataFile(nextBrokerId)) {
+                    return false;
+                }
+                this.registerState = RegisterState.CREATE_TEMP_METADATA_FILE_DONE;
+            }
+            // 3. apply brokerId to controller, and create metadata file
+            if (this.registerState == RegisterState.CREATE_TEMP_METADATA_FILE_DONE) {
+                if (!applyBrokerId()) {
+                    // apply broker id failed, means that this brokerId has been used
+                    // delete temp metadata file
+                    this.tempBrokerMetadata.clear();
+                    // back to the first step
+                    this.registerState = RegisterState.INITIAL;
+                    return false;
+                }
+                if (!createMetadataFileAndDeleteTemp()) {
+                    return false;
+                }
+                this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+            }
+            // 4. register success
+            if (this.registerState == RegisterState.CREATE_METADATA_FILE_DONE) {
+                if (!registerSuccess()) {
+                    return false;
+                }
+                this.registerState = RegisterState.REGISTERED;
+            }
+            return true;
+        } catch (final Exception e) {
+            LOGGER.error("Failed to register broker to controller", e);
+            return false;
+        }
+    }
+
+    /**
+     * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set
+     * @return next brokerId in this broker-set
+     */
+    private Long getNextBrokerId() {
+        try {
+            GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+            return nextBrokerIdResp.getNextBrokerId();
+        } catch (Exception e) {
+            LOGGER.error("fail to get next broker id from controller", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create temp metadata file in local file system, records the brokerId and registerCheckCode
+     * @param brokerId the brokerId that is expected to be assigned
+     * @return whether the temp meta file is created successfully
+     */
+
+    private boolean createTempMetadataFile(Long brokerId) {
+        // generate register check code, format like that: $ipAddress;$timestamp
+        String registerCheckCode = this.localAddress + ";" + System.currentTimeMillis();
+        try {
+            this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("update and persist temp broker metadata file failed", e);
+            this.tempBrokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send applyBrokerId request to controller
+     * @return whether controller has assigned this brokerId for this broker
+     */
+    private boolean applyBrokerId() {
+        try {
+            ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
+                    tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+            return true;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to apply broker id", e);
+            return false;
+        }
+    }
+
+    /**
+     * Create metadata file and delete temp metadata file
+     * @return whether process success
+     */
+    private boolean createMetadataFileAndDeleteTemp() {
+        // create metadata file and delete temp metadata file
+        try {
+            this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
+            this.tempBrokerMetadata.clear();
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to create metadata file", e);
+            this.brokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send registerSuccess request to inform controller that now broker has been registered successfully and controller should update broker ipAddress if changed
+     * @return whether request success
+     */
+    private boolean registerSuccess() {
+        try {
+            RegisterSuccessResponseHeader response = this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, localAddress, controllerLeaderAddress);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to send registerSuccess request to controller", e);
+            return false;
+        }
+    }
+
+    /**
+     * Confirm the registering state now
+     */
+    private void confirmNowRegisteringState() {
+        // 1. check if metadata exist
+        try {
+            this.brokerMetadata.readFromFile();
+        } catch (Exception e) {
+            LOGGER.error("read metadata file failed", e);
+        }
+        if (this.brokerMetadata.isLoaded()) {
+            this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+            this.brokerId = brokerMetadata.getBrokerId();
+            return;
+        }
+        // 2. check if temp metadata exist
+        try {
+            this.tempBrokerMetadata.readFromFile();
+        } catch (Exception e) {
+            LOGGER.error("read temp metadata file failed", e);
+        }
+        if (this.tempBrokerMetadata.isLoaded()) {
+            this.registerState = RegisterState.CREATE_TEMP_METADATA_FILE_DONE;
+        }
+    }
+
     /**
      * Scheduling sync broker metadata form controller.
      */
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index f123549cf..82e9ea33e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -107,8 +107,14 @@ import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSet
 import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequestHeader;
@@ -130,6 +136,7 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMetrics;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_ID_INVALID;
 import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
 import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED;
 import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED;
@@ -1213,6 +1220,39 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
+    public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, final String brokerName, final String controllerAddress) throws Exception {
+        final GetNextBrokerIdRequestHeader requestHeader = new GetNextBrokerIdRequestHeader(clusterName, brokerName);
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_NEXT_BROKER_ID, requestHeader);
+        final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+        assert response != null;
+        if (response.getCode() == SUCCESS) {
+            return (GetNextBrokerIdResponseHeader) response.decodeCommandCustomHeader(GetNextBrokerIdResponseHeader.class);
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final String brokerName, final Long brokerId, final String registerCheckCode, final String controllerAddress) throws Exception {
+        final ApplyBrokerIdRequestHeader requestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, registerCheckCode);
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.APPLY_BROKER_ID, requestHeader);
+        final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+        assert response != null;
+        if (response.getCode() == SUCCESS) {
+            return (ApplyBrokerIdResponseHeader) response.decodeCommandCustomHeader(ApplyBrokerIdResponseHeader.class);
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public RegisterSuccessResponseHeader registerSuccess(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
+        final RegisterSuccessRequestHeader requestHeader = new RegisterSuccessRequestHeader(clusterName, brokerName, brokerId, brokerAddress);
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_SUCCESS, requestHeader);
+        final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+        assert response != null;
+        if (response.getCode() == SUCCESS) {
+            return (RegisterSuccessResponseHeader) response.decodeCommandCustomHeader(RegisterSuccessResponseHeader.class);
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
     /**
      * Get broker replica info
      */
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index bb5ced847..ab8880625 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -3076,14 +3076,14 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
     }
 
     public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName,
-        String brokerAddr) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException {
+        Long brokerId) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException {
 
         //get controller leader address
         final GetMetaDataResponseHeader controllerMetaData = this.getControllerMetaData(controllerAddr);
         assert controllerMetaData != null;
         assert controllerMetaData.getControllerLeaderAddress() != null;
         final String leaderAddress = controllerMetaData.getControllerLeaderAddress();
-        ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerAddr);
+        ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerId);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, electRequestHeader);
         final RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 662403192..116607cc1 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -204,6 +204,9 @@ public class ControllerManager {
         controllerRemotingServer.registerProcessor(RequestCode.UPDATE_CONTROLLER_CONFIG, controllerRequestProcessor, this.controllerRequestExecutor);
         controllerRemotingServer.registerProcessor(RequestCode.GET_CONTROLLER_CONFIG, controllerRequestProcessor, this.controllerRequestExecutor);
         controllerRemotingServer.registerProcessor(RequestCode.CLEAN_BROKER_DATA, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.GET_NEXT_BROKER_ID, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.APPLY_BROKER_ID, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.REGISTER_SUCCESS, controllerRequestProcessor, this.controllerRequestExecutor);
     }
 
     public void start() {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index ec84f4ca7..6dea0e1d7 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -45,6 +45,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoReq
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
 
+import static org.apache.rocketmq.remoting.protocol.RequestCode.APPLY_BROKER_ID;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.BROKER_HEARTBEAT;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.CLEAN_BROKER_DATA;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
@@ -54,6 +55,8 @@ import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_R
 import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONTROLLER_CONFIG;
+import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_NEXT_BROKER_ID;
+import static org.apache.rocketmq.remoting.protocol.RequestCode.REGISTER_SUCCESS;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.UPDATE_CONTROLLER_CONFIG;
 
 /**
@@ -99,6 +102,12 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
                 return this.handleGetControllerConfig(ctx, request);
             case CLEAN_BROKER_DATA:
                 return this.handleCleanBrokerData(ctx, request);
+            case GET_NEXT_BROKER_ID:
+                return this.handleGetNextBrokerId(ctx, request);
+            case APPLY_BROKER_ID:
+                return this.handleApplyBrokerId(ctx, request);
+            case REGISTER_SUCCESS:
+                return this.handleRegisterSuccess(ctx, request);
             default: {
                 final String error = " request type " + request.getCode() + " not supported";
                 return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 6c93a5d46..62fd0198f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -275,5 +275,11 @@ public class RequestCode {
      */
     public static final int CLEAN_BROKER_DATA = 1011;
 
+    public static final int GET_NEXT_BROKER_ID = 1012;
+
+    public static final int APPLY_BROKER_ID = 1013;
+
+    public static final int REGISTER_SUCCESS = 1014;
+
 
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
index 8e4e3770e..6554fe509 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
@@ -120,4 +120,6 @@ public class ResponseCode extends RemotingSysResponseCode {
     public static final int CONTROLLER_ELECT_MASTER_FAILED = 2012;
     
     public static final int CONTROLLER_ALTER_SYNC_STATE_SET_FAILED = 2013;
+
+    public static final int CONTROLLER_BROKER_ID_INVALID = 2014;
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
index e8e1ea944..780f519a7 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
@@ -30,6 +30,13 @@ public class ApplyBrokerIdRequestHeader implements CommandCustomHeader {
 
     private String registerCheckCode;
 
+    public ApplyBrokerIdRequestHeader(String clusterName, String brokerName, Long appliedBrokerId, String registerCheckCode) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.appliedBrokerId = appliedBrokerId;
+        this.registerCheckCode = registerCheckCode;
+    }
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
index 382297f21..d83164747 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
@@ -26,17 +26,11 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader {
 
     private String brokerName;
 
-    // if nextBrokerId isn't null, means that matched ApplyBrokerIdRequest is failed.
-    private Long nextBrokerId;
 
-    public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) {
-        this(clusterName, brokerName, null);
-    }
 
-    public ApplyBrokerIdResponseHeader(String clusterName, String brokerName, Long nextBrokerId) {
+    public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.nextBrokerId = nextBrokerId;
     }
 
 
@@ -45,7 +39,6 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader {
         return "ApplyBrokerIdResponseHeader{" +
                 "clusterName='" + clusterName + '\'' +
                 ", brokerName='" + brokerName + '\'' +
-                ", nextBrokerId=" + nextBrokerId +
                 '}';
     }
 
@@ -53,4 +46,5 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
 
     }
+
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
index 04f522a7a..ddec9b0ea 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
@@ -47,4 +47,8 @@ public class GetNextBrokerIdResponseHeader implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
 
     }
+
+    public Long getNextBrokerId() {
+        return nextBrokerId;
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
similarity index 73%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
index e8e1ea944..721509ac9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
@@ -20,15 +20,22 @@ package org.apache.rocketmq.remoting.protocol.header.controller.register;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class ApplyBrokerIdRequestHeader implements CommandCustomHeader {
+public class RegisterSuccessRequestHeader implements CommandCustomHeader {
 
     private String clusterName;
 
     private String brokerName;
 
-    private Long appliedBrokerId;
+    private Long brokerId;
 
-    private String registerCheckCode;
+    private String brokerAddress;
+
+    public RegisterSuccessRequestHeader(String clusterName, String brokerName, Long brokerId, String brokerAddress) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerId = brokerId;
+        this.brokerAddress = brokerAddress;
+    }
 
     @Override
     public void checkFields() throws RemotingCommandException {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
similarity index 83%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
index e8e1ea944..953190480 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
@@ -20,18 +20,11 @@ package org.apache.rocketmq.remoting.protocol.header.controller.register;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class ApplyBrokerIdRequestHeader implements CommandCustomHeader {
-
-    private String clusterName;
-
-    private String brokerName;
-
-    private Long appliedBrokerId;
-
-    private String registerCheckCode;
+public class RegisterSuccessResponseHeader implements CommandCustomHeader {
 
     @Override
     public void checkFields() throws RemotingCommandException {
 
     }
+
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index e29fdc2b0..0a722c765 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -41,6 +41,10 @@ public class MessageStoreConfig {
     private String storePathEpochFile = System.getProperty("user.home") + File.separator + "store"
         + File.separator + "epochFileCheckpoint";
 
+    private String storePathMetadata = storePathRootDir + File.separator + "metadata";
+
+    private String storePathTempMetadata = storePathRootDir + File.separator + "metadata-temp";
+
     private String readOnlyCommitLogStorePaths = null;
 
     // CommitLog file size,default is 1G
@@ -627,6 +631,22 @@ public class MessageStoreConfig {
         this.storePathEpochFile = storePathEpochFile;
     }
 
+    public String getStorePathMetadata() {
+        return storePathMetadata;
+    }
+
+    public void setStorePathMetadata(String storePathMetadata) {
+        this.storePathMetadata = storePathMetadata;
+    }
+
+    public String getStorePathTempMetadata() {
+        return storePathTempMetadata;
+    }
+
+    public void setStorePathTempMetadata(String storePathTempMetadata) {
+        this.storePathTempMetadata = storePathTempMetadata;
+    }
+
     public String getDeleteWhen() {
         return deleteWhen;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
new file mode 100644
index 000000000..24f400204
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class BrokerMetadata extends MetadataFile {
+
+    private String clusterName;
+
+    private String brokerName;
+
+    private Long brokerId;
+
+    public BrokerMetadata(String filePath) {
+        this.filePath = filePath;
+    }
+
+    public void updateAndPersist(String clusterName, String brokerName, Long brokerId) throws Exception {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerId = brokerId;
+        writeToFile();
+    }
+
+    @Override
+    public String encodeToStr() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(clusterName).append(";");
+        sb.append(brokerName).append(";");
+        sb.append(brokerId);
+        return sb.toString();
+    }
+
+    @Override
+    public void decodeFromStr(String dataStr) {
+        if (dataStr == null) return;
+        String[] dataArr = dataStr.split(";");
+        this.clusterName = dataArr[0];
+        this.brokerName = dataArr[1];
+        this.brokerId = Long.valueOf(dataArr[2]);
+    }
+
+    @Override
+    public boolean isLoaded() {
+        return StringUtils.isNotEmpty(this.clusterName) && StringUtils.isNotEmpty(this.brokerName) && brokerId != null;
+    }
+
+    @Override
+    public void clearInMem() {
+        this.clusterName = null;
+        this.brokerName = null;
+        this.brokerId = null;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java
new file mode 100644
index 000000000..2e3c3ba99
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import org.apache.rocketmq.common.MixAll;
+
+import java.io.File;
+
+public abstract class MetadataFile {
+
+    protected String filePath;
+
+    public abstract String encodeToStr();
+
+    public abstract void decodeFromStr(String dataStr);
+
+    public abstract boolean isLoaded();
+
+    public abstract void clearInMem();
+
+    public void writeToFile() throws Exception {
+        deleteFile();
+        MixAll.string2File(encodeToStr(), this.filePath);
+    }
+
+    public void readFromFile() throws Exception {
+        String dataStr = MixAll.file2String(filePath);
+        decodeFromStr(dataStr);
+    }
+    public boolean fileExists() {
+        File file = new File(filePath);
+        return file.exists();
+    }
+
+    public void deleteFile() {
+        File file = new File(filePath);
+        file.deleteOnExit();
+    }
+
+    public void clear() {
+        clearInMem();
+        deleteFile();
+    }
+
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java
new file mode 100644
index 000000000..31b4aa5e8
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class TempBrokerMetadata extends MetadataFile {
+
+    private String clusterName;
+
+    private String brokerName;
+
+    private Long brokerId;
+
+    private String registerCheckCode;
+
+    public TempBrokerMetadata(String filePath) {
+        this(filePath, null, null, null, null);
+    }
+
+    public TempBrokerMetadata(String filePath, String clusterName, String brokerName, Long brokerId, String registerCheckCode) {
+        this.filePath = filePath;
+        this.clusterName = clusterName;
+        this.brokerId = brokerId;
+        this.brokerName = brokerName;
+        this.registerCheckCode = registerCheckCode;
+    }
+
+    public void updateAndPersist(String clusterName, String brokerName, Long brokerId, String registerCheckCode) throws Exception {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerId = brokerId;
+        this.registerCheckCode = registerCheckCode;
+        writeToFile();
+    }
+
+    @Override
+    public String encodeToStr() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(clusterName).append(";");
+        sb.append(brokerName).append(";");
+        sb.append(brokerId).append(";");
+        sb.append(registerCheckCode);
+        return sb.toString();
+    }
+
+    @Override
+    public void decodeFromStr(String dataStr) {
+        if (dataStr == null) return;
+        String[] dataArr = dataStr.split(";");
+        this.clusterName = dataArr[0];
+        this.brokerName = dataArr[1];
+        this.brokerId = Long.valueOf(dataArr[2]);
+        this.registerCheckCode = dataArr[3];
+    }
+
+    @Override
+    public boolean isLoaded() {
+        return StringUtils.isNotEmpty(this.clusterName) && StringUtils.isNotEmpty(this.brokerName) && brokerId != null && StringUtils.isNotEmpty(this.registerCheckCode);
+    }
+
+    @Override
+    public void clearInMem() {
+        this.clusterName = null;
+        this.brokerName = null;
+        this.brokerId = null;
+        this.registerCheckCode = null;
+    }
+
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+    public String getRegisterCheckCode() {
+        return registerCheckCode;
+    }
+}