You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/07 01:14:30 UTC
[rocketmq] 04/14: feat(broker): implement the general register to controller protocol
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 468a5d85dd9a6762e63a6fcb06aafac472cbfa40
Author: TheR1sing3un <th...@163.com>
AuthorDate: Sun Feb 5 13:11:51 2023 +0800
feat(broker): implement the general register to controller protocol
1. implement the general register to controller protocol
---
.../broker/controller/ReplicasManager.java | 182 ++++++++++++++++++++-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 40 +++++
.../rocketmq/client/impl/MQClientAPIImpl.java | 4 +-
.../rocketmq/controller/ControllerManager.java | 3 +
.../processor/ControllerRequestProcessor.java | 9 +
.../rocketmq/remoting/protocol/RequestCode.java | 6 +
.../rocketmq/remoting/protocol/ResponseCode.java | 2 +
.../register/ApplyBrokerIdRequestHeader.java | 7 +
.../register/ApplyBrokerIdResponseHeader.java | 10 +-
.../register/GetNextBrokerIdResponseHeader.java | 4 +
...ader.java => RegisterSuccessRequestHeader.java} | 13 +-
...der.java => RegisterSuccessResponseHeader.java} | 11 +-
.../rocketmq/store/config/MessageStoreConfig.java | 20 +++
.../store/ha/autoswitch/BrokerMetadata.java | 82 ++++++++++
.../rocketmq/store/ha/autoswitch/MetadataFile.java | 60 +++++++
.../store/ha/autoswitch/TempBrokerMetadata.java | 92 +++++++++++
16 files changed, 519 insertions(+), 26 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 8a03015f7..e3c9382f8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -47,9 +47,14 @@ import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata;
+import org.apache.rocketmq.store.ha.autoswitch.TempBrokerMetadata;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
@@ -78,6 +83,8 @@ public class ReplicasManager {
private volatile String controllerLeaderAddress = "";
private volatile State state = State.INITIAL;
+ private RegisterState registerState = RegisterState.INITIAL;
+
private ScheduledFuture<?> checkSyncStateSetTaskFuture;
private ScheduledFuture<?> slaveSyncFuture;
@@ -85,6 +92,10 @@ public class ReplicasManager {
private Long masterBrokerId;
+ private BrokerMetadata brokerMetadata;
+
+ private TempBrokerMetadata tempBrokerMetadata;
+
private Set<Long> syncStateSet;
private int syncStateSetEpoch = 0;
private String masterAddress = "";
@@ -103,6 +114,8 @@ public class ReplicasManager {
this.availableControllerAddresses = new ConcurrentHashMap<>();
this.syncStateSet = new HashSet<>();
this.localAddress = brokerController.getBrokerAddr();
+ this.brokerMetadata = new BrokerMetadata(this.brokerController.getMessageStoreConfig().getStorePathMetadata());
+ this.tempBrokerMetadata = new TempBrokerMetadata(this.brokerController.getMessageStoreConfig().getStorePathTempMetadata());
}
public long getConfirmOffset() {
@@ -113,12 +126,22 @@ public class ReplicasManager {
INITIAL,
FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
- FIRST_TIME_WAIT_MASTER_IS_ELECTED,
+ FIRST_TIME_REGISTER_TO_CONTROLLER_DONE,
RUNNING,
SHUTDOWN,
}
+ enum RegisterState {
+ INITIAL,
+
+ CREATE_TEMP_METADATA_FILE_DONE,
+
+ CREATE_METADATA_FILE_DONE,
+
+ REGISTERED
+ }
+
public void start() {
updateControllerAddr();
scanAvailableControllerAddresses();
@@ -157,13 +180,13 @@ public class ReplicasManager {
if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
if (registerBrokerToController()) {
LOGGER.info("First time register broker success");
- this.state = State.FIRST_TIME_WAIT_MASTER_IS_ELECTED;
+ this.state = State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE;
} else {
return false;
}
}
- if (this.state == State.FIRST_TIME_WAIT_MASTER_IS_ELECTED) {
+ if (this.state == State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE) {
if (StringUtils.isNotEmpty(this.masterAddress) || brokerElect()) {
LOGGER.info("Master in this broker set is elected");
this.state = State.RUNNING;
@@ -343,7 +366,7 @@ public class ReplicasManager {
}
private boolean registerBrokerToController() {
- // Register this broker to controller, get brokerId and masterAddress.
+ // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file.
try {
final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
@@ -373,6 +396,157 @@ public class ReplicasManager {
}
}
+ /**
+ * Register broker to controller, and persist the metadata to file
+ * @return whether registering process succeeded
+ */
+ private boolean registerBrokerToController2() {
+ try {
+ // 1. confirm now registering state
+ confirmNowRegisteringState();
+ // 2. get next assigning brokerId, and create temp metadata file
+ if (this.registerState == RegisterState.INITIAL) {
+ Long nextBrokerId = getNextBrokerId();
+ if (nextBrokerId == null || !createTempMetadataFile(nextBrokerId)) {
+ return false;
+ }
+ this.registerState = RegisterState.CREATE_TEMP_METADATA_FILE_DONE;
+ }
+ // 3. apply brokerId to controller, and create metadata file
+ if (this.registerState == RegisterState.CREATE_TEMP_METADATA_FILE_DONE) {
+ if (!applyBrokerId()) {
+ // apply broker id failed, means that this brokerId has been used
+ // delete temp metadata file
+ this.tempBrokerMetadata.clear();
+ // back to the first step
+ this.registerState = RegisterState.INITIAL;
+ return false;
+ }
+ if (!createMetadataFileAndDeleteTemp()) {
+ return false;
+ }
+ this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+ }
+ // 4. register success
+ if (this.registerState == RegisterState.CREATE_METADATA_FILE_DONE) {
+ if (!registerSuccess()) {
+ return false;
+ }
+ this.registerState = RegisterState.REGISTERED;
+ }
+ return true;
+ } catch (final Exception e) {
+ LOGGER.error("Failed to register broker to controller", e);
+ return false;
+ }
+ }
+
+ /**
+ * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set
+ * @return next brokerId in this broker-set
+ */
+ private Long getNextBrokerId() {
+ try {
+ GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+ return nextBrokerIdResp.getNextBrokerId();
+ } catch (Exception e) {
+ LOGGER.error("fail to get next broker id from controller", e);
+ return null;
+ }
+ }
+
+ /**
+ * Create temp metadata file in local file system, records the brokerId and registerCheckCode
+ * @param brokerId the brokerId that is expected to be assigned
+ * @return whether the temp meta file is created successfully
+ */
+
+ private boolean createTempMetadataFile(Long brokerId) {
+ // generate register check code, format like that: $ipAddress;$timestamp
+ String registerCheckCode = this.localAddress + ";" + System.currentTimeMillis();
+ try {
+ this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("update and persist temp broker metadata file failed", e);
+ this.tempBrokerMetadata.clear();
+ return false;
+ }
+ }
+
+ /**
+ * Send applyBrokerId request to controller
+ * @return whether controller has assigned this brokerId for this broker
+ */
+ private boolean applyBrokerId() {
+ try {
+ ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
+ tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+ return true;
+
+ } catch (Exception e) {
+ LOGGER.error("fail to apply broker id", e);
+ return false;
+ }
+ }
+
+ /**
+ * Create metadata file and delete temp metadata file
+ * @return whether process success
+ */
+ private boolean createMetadataFileAndDeleteTemp() {
+ // create metadata file and delete temp metadata file
+ try {
+ this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
+ this.tempBrokerMetadata.clear();
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("fail to create metadata file", e);
+ this.brokerMetadata.clear();
+ return false;
+ }
+ }
+
+ /**
+ * Send registerSuccess request to inform controller that now broker has been registered successfully and controller should update broker ipAddress if changed
+ * @return whether request success
+ */
+ private boolean registerSuccess() {
+ try {
+ RegisterSuccessResponseHeader response = this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, localAddress, controllerLeaderAddress);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("fail to send registerSuccess request to controller", e);
+ return false;
+ }
+ }
+
+ /**
+ * Confirm the registering state now
+ */
+ private void confirmNowRegisteringState() {
+ // 1. check if metadata exist
+ try {
+ this.brokerMetadata.readFromFile();
+ } catch (Exception e) {
+ LOGGER.error("read metadata file failed", e);
+ }
+ if (this.brokerMetadata.isLoaded()) {
+ this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+ this.brokerId = brokerMetadata.getBrokerId();
+ return;
+ }
+ // 2. check if temp metadata exist
+ try {
+ this.tempBrokerMetadata.readFromFile();
+ } catch (Exception e) {
+ LOGGER.error("read temp metadata file failed", e);
+ }
+ if (this.tempBrokerMetadata.isLoaded()) {
+ this.registerState = RegisterState.CREATE_TEMP_METADATA_FILE_DONE;
+ }
+ }
+
/**
* Scheduling sync broker metadata form controller.
*/
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index f123549cf..82e9ea33e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -107,8 +107,14 @@ import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSet
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequestHeader;
@@ -130,6 +136,7 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMetrics;
import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_ID_INVALID;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED;
@@ -1213,6 +1220,39 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+ public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, final String brokerName, final String controllerAddress) throws Exception {
+ final GetNextBrokerIdRequestHeader requestHeader = new GetNextBrokerIdRequestHeader(clusterName, brokerName);
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_NEXT_BROKER_ID, requestHeader);
+ final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+ assert response != null;
+ if (response.getCode() == SUCCESS) {
+ return (GetNextBrokerIdResponseHeader) response.decodeCommandCustomHeader(GetNextBrokerIdResponseHeader.class);
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final String brokerName, final Long brokerId, final String registerCheckCode, final String controllerAddress) throws Exception {
+ final ApplyBrokerIdRequestHeader requestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, registerCheckCode);
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.APPLY_BROKER_ID, requestHeader);
+ final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+ assert response != null;
+ if (response.getCode() == SUCCESS) {
+ return (ApplyBrokerIdResponseHeader) response.decodeCommandCustomHeader(ApplyBrokerIdResponseHeader.class);
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ public RegisterSuccessResponseHeader registerSuccess(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
+ final RegisterSuccessRequestHeader requestHeader = new RegisterSuccessRequestHeader(clusterName, brokerName, brokerId, brokerAddress);
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_SUCCESS, requestHeader);
+ final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+ assert response != null;
+ if (response.getCode() == SUCCESS) {
+ return (RegisterSuccessResponseHeader) response.decodeCommandCustomHeader(RegisterSuccessResponseHeader.class);
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
/**
* Get broker replica info
*/
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index bb5ced847..ab8880625 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -3076,14 +3076,14 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
}
public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName,
- String brokerAddr) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException {
+ Long brokerId) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException {
//get controller leader address
final GetMetaDataResponseHeader controllerMetaData = this.getControllerMetaData(controllerAddr);
assert controllerMetaData != null;
assert controllerMetaData.getControllerLeaderAddress() != null;
final String leaderAddress = controllerMetaData.getControllerLeaderAddress();
- ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerAddr);
+ ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, electRequestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 662403192..116607cc1 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -204,6 +204,9 @@ public class ControllerManager {
controllerRemotingServer.registerProcessor(RequestCode.UPDATE_CONTROLLER_CONFIG, controllerRequestProcessor, this.controllerRequestExecutor);
controllerRemotingServer.registerProcessor(RequestCode.GET_CONTROLLER_CONFIG, controllerRequestProcessor, this.controllerRequestExecutor);
controllerRemotingServer.registerProcessor(RequestCode.CLEAN_BROKER_DATA, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.GET_NEXT_BROKER_ID, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.APPLY_BROKER_ID, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.REGISTER_SUCCESS, controllerRequestProcessor, this.controllerRequestExecutor);
}
public void start() {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index ec84f4ca7..6dea0e1d7 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -45,6 +45,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoReq
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
+import static org.apache.rocketmq.remoting.protocol.RequestCode.APPLY_BROKER_ID;
import static org.apache.rocketmq.remoting.protocol.RequestCode.BROKER_HEARTBEAT;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CLEAN_BROKER_DATA;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
@@ -54,6 +55,8 @@ import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_R
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONTROLLER_CONFIG;
+import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_NEXT_BROKER_ID;
+import static org.apache.rocketmq.remoting.protocol.RequestCode.REGISTER_SUCCESS;
import static org.apache.rocketmq.remoting.protocol.RequestCode.UPDATE_CONTROLLER_CONFIG;
/**
@@ -99,6 +102,12 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
return this.handleGetControllerConfig(ctx, request);
case CLEAN_BROKER_DATA:
return this.handleCleanBrokerData(ctx, request);
+ case GET_NEXT_BROKER_ID:
+ return this.handleGetNextBrokerId(ctx, request);
+ case APPLY_BROKER_ID:
+ return this.handleApplyBrokerId(ctx, request);
+ case REGISTER_SUCCESS:
+ return this.handleRegisterSuccess(ctx, request);
default: {
final String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 6c93a5d46..62fd0198f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -275,5 +275,11 @@ public class RequestCode {
*/
public static final int CLEAN_BROKER_DATA = 1011;
+ public static final int GET_NEXT_BROKER_ID = 1012;
+
+ public static final int APPLY_BROKER_ID = 1013;
+
+ public static final int REGISTER_SUCCESS = 1014;
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
index 8e4e3770e..6554fe509 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
@@ -120,4 +120,6 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int CONTROLLER_ELECT_MASTER_FAILED = 2012;
public static final int CONTROLLER_ALTER_SYNC_STATE_SET_FAILED = 2013;
+
+ public static final int CONTROLLER_BROKER_ID_INVALID = 2014;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
index e8e1ea944..780f519a7 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
@@ -30,6 +30,13 @@ public class ApplyBrokerIdRequestHeader implements CommandCustomHeader {
private String registerCheckCode;
+ public ApplyBrokerIdRequestHeader(String clusterName, String brokerName, Long appliedBrokerId, String registerCheckCode) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.appliedBrokerId = appliedBrokerId;
+ this.registerCheckCode = registerCheckCode;
+ }
+
@Override
public void checkFields() throws RemotingCommandException {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
index 382297f21..d83164747 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
@@ -26,17 +26,11 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader {
private String brokerName;
- // if nextBrokerId isn't null, means that matched ApplyBrokerIdRequest is failed.
- private Long nextBrokerId;
- public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) {
- this(clusterName, brokerName, null);
- }
- public ApplyBrokerIdResponseHeader(String clusterName, String brokerName, Long nextBrokerId) {
+ public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) {
this.clusterName = clusterName;
this.brokerName = brokerName;
- this.nextBrokerId = nextBrokerId;
}
@@ -45,7 +39,6 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader {
return "ApplyBrokerIdResponseHeader{" +
"clusterName='" + clusterName + '\'' +
", brokerName='" + brokerName + '\'' +
- ", nextBrokerId=" + nextBrokerId +
'}';
}
@@ -53,4 +46,5 @@ public class ApplyBrokerIdResponseHeader implements CommandCustomHeader {
public void checkFields() throws RemotingCommandException {
}
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
index 04f522a7a..ddec9b0ea 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
@@ -47,4 +47,8 @@ public class GetNextBrokerIdResponseHeader implements CommandCustomHeader {
public void checkFields() throws RemotingCommandException {
}
+
+ public Long getNextBrokerId() {
+ return nextBrokerId;
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
similarity index 73%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
index e8e1ea944..721509ac9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
@@ -20,15 +20,22 @@ package org.apache.rocketmq.remoting.protocol.header.controller.register;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class ApplyBrokerIdRequestHeader implements CommandCustomHeader {
+public class RegisterSuccessRequestHeader implements CommandCustomHeader {
private String clusterName;
private String brokerName;
- private Long appliedBrokerId;
+ private Long brokerId;
- private String registerCheckCode;
+ private String brokerAddress;
+
+ public RegisterSuccessRequestHeader(String clusterName, String brokerName, Long brokerId, String brokerAddress) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerId = brokerId;
+ this.brokerAddress = brokerAddress;
+ }
@Override
public void checkFields() throws RemotingCommandException {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
similarity index 83%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
index e8e1ea944..953190480 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
@@ -20,18 +20,11 @@ package org.apache.rocketmq.remoting.protocol.header.controller.register;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class ApplyBrokerIdRequestHeader implements CommandCustomHeader {
-
- private String clusterName;
-
- private String brokerName;
-
- private Long appliedBrokerId;
-
- private String registerCheckCode;
+public class RegisterSuccessResponseHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
}
+
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index e29fdc2b0..0a722c765 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -41,6 +41,10 @@ public class MessageStoreConfig {
private String storePathEpochFile = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "epochFileCheckpoint";
+ private String storePathMetadata = storePathRootDir + File.separator + "metadata";
+
+ private String storePathTempMetadata = storePathRootDir + File.separator + "metadata-temp";
+
private String readOnlyCommitLogStorePaths = null;
// CommitLog file size,default is 1G
@@ -627,6 +631,22 @@ public class MessageStoreConfig {
this.storePathEpochFile = storePathEpochFile;
}
+ public String getStorePathMetadata() {
+ return storePathMetadata;
+ }
+
+ public void setStorePathMetadata(String storePathMetadata) {
+ this.storePathMetadata = storePathMetadata;
+ }
+
+ public String getStorePathTempMetadata() {
+ return storePathTempMetadata;
+ }
+
+ public void setStorePathTempMetadata(String storePathTempMetadata) {
+ this.storePathTempMetadata = storePathTempMetadata;
+ }
+
public String getDeleteWhen() {
return deleteWhen;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
new file mode 100644
index 000000000..24f400204
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class BrokerMetadata extends MetadataFile {
+
+ private String clusterName;
+
+ private String brokerName;
+
+ private Long brokerId;
+
+ public BrokerMetadata(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public void updateAndPersist(String clusterName, String brokerName, Long brokerId) throws Exception {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerId = brokerId;
+ writeToFile();
+ }
+
+ @Override
+ public String encodeToStr() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(clusterName).append(";");
+ sb.append(brokerName).append(";");
+ sb.append(brokerId);
+ return sb.toString();
+ }
+
+ @Override
+ public void decodeFromStr(String dataStr) {
+ if (dataStr == null) return;
+ String[] dataArr = dataStr.split(";");
+ this.clusterName = dataArr[0];
+ this.brokerName = dataArr[1];
+ this.brokerId = Long.valueOf(dataArr[2]);
+ }
+
+ @Override
+ public boolean isLoaded() {
+ return StringUtils.isNotEmpty(this.clusterName) && StringUtils.isNotEmpty(this.brokerName) && brokerId != null;
+ }
+
+ @Override
+ public void clearInMem() {
+ this.clusterName = null;
+ this.brokerName = null;
+ this.brokerId = null;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public Long getBrokerId() {
+ return brokerId;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java
new file mode 100644
index 000000000..2e3c3ba99
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import org.apache.rocketmq.common.MixAll;
+
+import java.io.File;
+
+public abstract class MetadataFile {
+
+ protected String filePath;
+
+ public abstract String encodeToStr();
+
+ public abstract void decodeFromStr(String dataStr);
+
+ public abstract boolean isLoaded();
+
+ public abstract void clearInMem();
+
+ public void writeToFile() throws Exception {
+ deleteFile();
+ MixAll.string2File(encodeToStr(), this.filePath);
+ }
+
+ public void readFromFile() throws Exception {
+ String dataStr = MixAll.file2String(filePath);
+ decodeFromStr(dataStr);
+ }
+ public boolean fileExists() {
+ File file = new File(filePath);
+ return file.exists();
+ }
+
+ public void deleteFile() {
+ File file = new File(filePath);
+ file.deleteOnExit();
+ }
+
+ public void clear() {
+ clearInMem();
+ deleteFile();
+ }
+
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java
new file mode 100644
index 000000000..31b4aa5e8
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class TempBrokerMetadata extends MetadataFile {
+
+ private String clusterName;
+
+ private String brokerName;
+
+ private Long brokerId;
+
+ private String registerCheckCode;
+
+ public TempBrokerMetadata(String filePath) {
+ this(filePath, null, null, null, null);
+ }
+
+ public TempBrokerMetadata(String filePath, String clusterName, String brokerName, Long brokerId, String registerCheckCode) {
+ this.filePath = filePath;
+ this.clusterName = clusterName;
+ this.brokerId = brokerId;
+ this.brokerName = brokerName;
+ this.registerCheckCode = registerCheckCode;
+ }
+
+ public void updateAndPersist(String clusterName, String brokerName, Long brokerId, String registerCheckCode) throws Exception {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerId = brokerId;
+ this.registerCheckCode = registerCheckCode;
+ writeToFile();
+ }
+
+ @Override
+ public String encodeToStr() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(clusterName).append(";");
+ sb.append(brokerName).append(";");
+ sb.append(brokerId).append(";");
+ sb.append(registerCheckCode);
+ return sb.toString();
+ }
+
+ @Override
+ public void decodeFromStr(String dataStr) {
+ if (dataStr == null) return;
+ String[] dataArr = dataStr.split(";");
+ this.clusterName = dataArr[0];
+ this.brokerName = dataArr[1];
+ this.brokerId = Long.valueOf(dataArr[2]);
+ this.registerCheckCode = dataArr[3];
+ }
+
+ @Override
+ public boolean isLoaded() {
+ return StringUtils.isNotEmpty(this.clusterName) && StringUtils.isNotEmpty(this.brokerName) && brokerId != null && StringUtils.isNotEmpty(this.registerCheckCode);
+ }
+
+ @Override
+ public void clearInMem() {
+ this.clusterName = null;
+ this.brokerName = null;
+ this.brokerId = null;
+ this.registerCheckCode = null;
+ }
+
+ public Long getBrokerId() {
+ return brokerId;
+ }
+
+ public String getRegisterCheckCode() {
+ return registerCheckCode;
+ }
+}