You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/04/30 12:29:58 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of Code] Dledger controller implementation (#4195)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new d26773a9a [Summer of Code] Dledger controller implementation (#4195)
d26773a9a is described below
commit d26773a9ab181ce825dc465c900840f48e1ec6b8
Author: hzh0425 <58...@users.noreply.github.com>
AuthorDate: Sat Apr 30 20:29:53 2022 +0800
[Summer of Code] Dledger controller implementation (#4195)
* feature: initial, add controllerApi, event, request and response
* feature: Apply event in ReplicasInfoManager
* feature: Done the work in ReplicasInfoManager
next step: try test
* feature: Add some test for ReplicasInfoManager
* feature: Build the architecture of dledgerController
* feature: Done the work in controller
* style: review code
* feature: add controllerProcessor in name-srv
* style: use defensive copy in constructor;
* style: review code
* style: review code
* feature: let controller api return RemotingCommand
* feature:
1.remove originMasterId in replicasInfo
2.add DledgerControllerConfig
* feature:
1.add option isProcessReadEvent.
2.add ControllerConfig
* feature:
add namesrv into dledgerController to predict whether the broker is alive.
* style: code review
* feature: process initial log when controller become leader
* style: review code
* style: review code
* style: review code
* style: change version
* fixbug
---
.../rocketmq/common/constant/LoggerName.java | 1 +
.../rocketmq/common/namesrv/ControllerConfig.java | 133 +++++++
.../rocketmq/common/protocol/RequestCode.java | 13 +
.../rocketmq/common/protocol/ResponseCode.java | 11 +
.../controller/AlterSyncStateSetRequestHeader.java | 74 ++++
.../AlterSyncStateSetResponseHeader.java | 58 +++
.../controller/ElectMasterRequestHeader.java | 43 +++
.../controller/ElectMasterResponseHeader.java | 56 +++
.../controller/GetMetaDataResponseHeader.java | 49 +++
.../controller/GetReplicaInfoRequestHeader.java | 43 +++
.../controller/GetReplicaInfoResponseHeader.java | 77 ++++
.../controller/RegisterBrokerRequestHeader.java | 57 +++
.../controller/RegisterBrokerResponseHeader.java | 67 ++++
.../rocketmq/common/utils/FastJsonSerializer.java | 62 ++++
.../apache/rocketmq/common/utils/Serializer.java | 35 ++
namesrv/pom.xml | 5 +
.../apache/rocketmq/namesrv/NamesrvController.java | 55 ++-
.../apache/rocketmq/namesrv/NamesrvStartup.java | 6 +-
.../rocketmq/namesrv/controller/Controller.java | 90 +++++
.../namesrv/controller/impl/DledgerController.java | 413 +++++++++++++++++++++
.../impl/DledgerControllerStateMachine.java | 75 ++++
.../namesrv/controller/manager/BrokerIdInfo.java | 54 +++
.../controller/manager/InSyncReplicasInfo.java | 82 ++++
.../controller/manager/ReplicasInfoManager.java | 346 +++++++++++++++++
.../manager/event/AlterSyncStateSetEvent.java | 55 +++
.../manager/event/ApplyBrokerIdEvent.java | 58 +++
.../controller/manager/event/ControllerResult.java | 68 ++++
.../controller/manager/event/ElectMasterEvent.java | 75 ++++
.../controller/manager/event/EventMessage.java | 28 ++
.../controller/manager/event/EventSerializer.java | 77 ++++
.../controller/manager/event/EventType.java | 57 +++
.../processor/ControllerRequestProcessor.java | 109 ++++++
.../namesrv/routeinfo/RouteInfoManager.java | 51 ++-
.../controller/impl/DledgerControllerTest.java | 259 +++++++++++++
.../manager/ReplicasInfoManagerTest.java | 137 +++++++
35 files changed, 2862 insertions(+), 17 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index a77d5c2d2..fb3a9b85d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.common.constant;
public class LoggerName {
public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv";
public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv";
+ public static final String CONTROLLER_LOGGER_NAME = "RocketmqController";
public static final String NAMESRV_WATER_MARK_LOGGER_NAME = "RocketmqNamesrvWaterMark";
public static final String BROKER_LOGGER_NAME = "RocketmqBroker";
public static final String BROKER_CONSOLE_NAME = "RocketmqConsole";
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
new file mode 100644
index 000000000..4d4525aef
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.namesrv;
+
+import java.io.File;
+
+public class ControllerConfig {
+
+ /**
+ * Is startup the controller in this name-srv
+ */
+ private boolean isStartupController = false;
+
+ /**
+ * Indicates the nums of thread to handle broker or operation requests, like REGISTER_BROKER.
+ */
+ private int controllerThreadPoolNums = 16;
+
+ /**
+ * Indicates the capacity of queue to hold client requests.
+ */
+ private int controllerRequestThreadPoolQueueCapacity = 50000;
+
+ private String controllerDLegerGroup;
+ private String controllerDLegerPeers;
+ private String controllerDLegerSelfId;
+ private int mappedFileSize = 1024 * 1024 * 1024;
+ private String controllerStorePath = System.getProperty("user.home") + File.separator + "DledgerController";
+
+ /**
+ * Whether the controller can elect a master which is not in the syncStateSet.
+ */
+ private boolean enableElectUncleanMaster = false;
+
+ /**
+ * Whether process read event
+ */
+ private boolean isProcessReadEvent = false;
+
+ public boolean isStartupController() {
+ return isStartupController;
+ }
+
+ public void setStartupController(boolean startupController) {
+ isStartupController = startupController;
+ }
+
+ public int getControllerThreadPoolNums() {
+ return controllerThreadPoolNums;
+ }
+
+ public void setControllerThreadPoolNums(int controllerThreadPoolNums) {
+ this.controllerThreadPoolNums = controllerThreadPoolNums;
+ }
+
+ public int getControllerRequestThreadPoolQueueCapacity() {
+ return controllerRequestThreadPoolQueueCapacity;
+ }
+
+ public void setControllerRequestThreadPoolQueueCapacity(int controllerRequestThreadPoolQueueCapacity) {
+ this.controllerRequestThreadPoolQueueCapacity = controllerRequestThreadPoolQueueCapacity;
+ }
+
+ public String getControllerDLegerGroup() {
+ return controllerDLegerGroup;
+ }
+
+ public void setControllerDLegerGroup(String controllerDLegerGroup) {
+ this.controllerDLegerGroup = controllerDLegerGroup;
+ }
+
+ public String getControllerDLegerPeers() {
+ return controllerDLegerPeers;
+ }
+
+ public void setControllerDLegerPeers(String controllerDLegerPeers) {
+ this.controllerDLegerPeers = controllerDLegerPeers;
+ }
+
+ public String getControllerDLegerSelfId() {
+ return controllerDLegerSelfId;
+ }
+
+ public void setControllerDLegerSelfId(String controllerDLegerSelfId) {
+ this.controllerDLegerSelfId = controllerDLegerSelfId;
+ }
+
+ public int getMappedFileSize() {
+ return mappedFileSize;
+ }
+
+ public void setMappedFileSize(int mappedFileSize) {
+ this.mappedFileSize = mappedFileSize;
+ }
+
+ public String getControllerStorePath() {
+ return controllerStorePath;
+ }
+
+ public void setControllerStorePath(String controllerStorePath) {
+ this.controllerStorePath = controllerStorePath;
+ }
+
+ public boolean isEnableElectUncleanMaster() {
+ return enableElectUncleanMaster;
+ }
+
+ public void setEnableElectUncleanMaster(boolean enableElectUncleanMaster) {
+ this.enableElectUncleanMaster = enableElectUncleanMaster;
+ }
+
+ public boolean isProcessReadEvent() {
+ return isProcessReadEvent;
+ }
+
+ public void setProcessReadEvent(boolean processReadEvent) {
+ isProcessReadEvent = processReadEvent;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 541227f4a..c6048ecbf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -236,4 +236,17 @@ public class RequestCode {
public static final int GET_BROKER_HA_STATUS = 907;
public static final int RESET_MASTER_FLUSH_OFFSET = 908;
+
+ /**
+ * Controller code
+ */
+ public static final int CONTROLLER_ALTER_SYNC_STATE_SET = 1001;
+
+ public static final int CONTROLLER_ELECT_MASTER = 1002;
+
+ public static final int CONTROLLER_REGISTER_BROKER = 1003;
+
+ public static final int CONTROLLER_GET_REPLICA_INFO = 1004;
+
+ public static final int CONTROLLER_GET_METADATA_INFO = 1005;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index db2a275e3..a947854c4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -97,5 +97,16 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004;
public static final int RPC_TIME_OUT = -1006;
+ /**
+ * Controller response code
+ */
+ public static final int CONTROLLER_FENCED_MASTER_EPOCH = 2000;
+ public static final int CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH = 2001;
+ public static final int CONTROLLER_INVALID_MASTER = 2002;
+ public static final int CONTROLLER_INVALID_REPLICAS = 2003;
+ public static final int CONTROLLER_MASTER_NOT_AVAILABLE = 2004;
+ public static final int CONTROLLER_INVALID_REQUEST = 2005;
+ public static final int CONTROLLER_BROKER_NOT_ALIVE = 2006;
+ public static final int CONTROLLER_NOT_LEADER = 2007;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetRequestHeader.java
new file mode 100644
index 000000000..3dc396e96
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetRequestHeader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AlterSyncStateSetRequestHeader implements CommandCustomHeader {
+ private final String brokerName;
+ private final String masterAddress;
+ private final int masterEpoch;
+ private final Set<String> newSyncStateSet;
+ private final int syncStateSetEpoch;
+
+ public AlterSyncStateSetRequestHeader(String brokerName, String masterAddress, int masterEpoch,
+ Set<String> newSyncStateSet, int syncStateSetEpoch) {
+ this.brokerName = brokerName;
+ this.masterAddress = masterAddress;
+ this.masterEpoch = masterEpoch;
+ this.newSyncStateSet = new HashSet<>(newSyncStateSet);
+ this.syncStateSetEpoch = syncStateSetEpoch;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public String getMasterAddress() {
+ return masterAddress;
+ }
+
+ public int getMasterEpoch() {
+ return masterEpoch;
+ }
+
+ public Set<String> getNewSyncStateSet() {
+ return new HashSet<>(newSyncStateSet);
+ }
+
+ public int getSyncStateSetEpoch() {
+ return syncStateSetEpoch;
+ }
+
+ @Override
+ public String toString() {
+ return "AlterSyncStateSetRequestHeader{" +
+ "brokerName='" + brokerName + '\'' +
+ ", masterAddress='" + masterAddress + '\'' +
+ ", masterEpoch=" + masterEpoch +
+ ", newSyncStateSet=" + newSyncStateSet +
+ ", syncStateSetEpoch=" + syncStateSetEpoch +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetResponseHeader.java
new file mode 100644
index 000000000..8ab55552a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetResponseHeader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AlterSyncStateSetResponseHeader implements CommandCustomHeader {
+ private Set<String> newSyncStateSet;
+ private int newSyncStateSetEpoch;
+
+ public AlterSyncStateSetResponseHeader() {
+ }
+
+ public Set<String> getNewSyncStateSet() {
+ return new HashSet<>(newSyncStateSet);
+ }
+
+ public void setNewSyncStateSet(Set<String> newSyncStateSet) {
+ this.newSyncStateSet = new HashSet<>(newSyncStateSet);
+ }
+
+ public int getNewSyncStateSetEpoch() {
+ return newSyncStateSetEpoch;
+ }
+
+ public void setNewSyncStateSetEpoch(int newSyncStateSetEpoch) {
+ this.newSyncStateSetEpoch = newSyncStateSetEpoch;
+ }
+
+ @Override
+ public String toString() {
+ return "AlterSyncStateSetResponseHeader{" +
+ "newSyncStateSet=" + newSyncStateSet +
+ ", newSyncStateSetEpoch=" + newSyncStateSetEpoch +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
new file mode 100644
index 000000000..430353bd2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ElectMasterRequestHeader implements CommandCustomHeader {
+ private final String brokerName;
+
+ public ElectMasterRequestHeader(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ @Override
+ public String toString() {
+ return "ElectMasterRequestHeader{" +
+ "brokerName='" + brokerName + '\'' +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
new file mode 100644
index 000000000..1adcfe3b7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ElectMasterResponseHeader implements CommandCustomHeader {
+ private String newMasterAddress;
+ private int masterEpoch;
+
+ public ElectMasterResponseHeader() {
+ }
+
+ public String getNewMasterAddress() {
+ return newMasterAddress;
+ }
+
+ public void setNewMasterAddress(String newMasterAddress) {
+ this.newMasterAddress = newMasterAddress;
+ }
+
+ public int getMasterEpoch() {
+ return masterEpoch;
+ }
+
+ public void setMasterEpoch(int masterEpoch) {
+ this.masterEpoch = masterEpoch;
+ }
+
+ @Override
+ public String toString() {
+ return "ElectMasterResponseHeader{" +
+ "newMasterAddress='" + newMasterAddress + '\'' +
+ ", masterEpoch=" + masterEpoch +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
new file mode 100644
index 000000000..5c8290fa3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetMetaDataResponseHeader implements CommandCustomHeader {
+ private final String controllerLeaderId;
+ private final String controllerLeaderAddress;
+
+ public GetMetaDataResponseHeader(String controllerLeaderId, String controllerLeaderAddress) {
+ this.controllerLeaderId = controllerLeaderId;
+ this.controllerLeaderAddress = controllerLeaderAddress;
+ }
+
+ public String getControllerLeaderId() {
+ return controllerLeaderId;
+ }
+
+ public String getControllerLeaderAddress() {
+ return controllerLeaderAddress;
+ }
+
+ @Override public String toString() {
+ return "GetMetaDataResponseHeader{" +
+ "controllerLeaderId='" + controllerLeaderId + '\'' +
+ ", controllerLeaderAddress='" + controllerLeaderAddress + '\'' +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
new file mode 100644
index 000000000..e90086a76
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetReplicaInfoRequestHeader implements CommandCustomHeader {
+ private final String brokerName;
+
+ public GetReplicaInfoRequestHeader(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ @Override
+ public String toString() {
+ return "GetReplicaInfoRequestHeader{" +
+ "brokerName='" + brokerName + '\'' +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
new file mode 100644
index 000000000..e135d0f0f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetReplicaInfoResponseHeader implements CommandCustomHeader {
+ private String masterAddress;
+ private int masterEpoch;
+ private Set<String> syncStateSet;
+ private int syncStateSetEpoch;
+
+ public GetReplicaInfoResponseHeader() {
+ }
+
+ public String getMasterAddress() {
+ return masterAddress;
+ }
+
+ public void setMasterAddress(String masterAddress) {
+ this.masterAddress = masterAddress;
+ }
+
+ public int getMasterEpoch() {
+ return masterEpoch;
+ }
+
+ public void setMasterEpoch(int masterEpoch) {
+ this.masterEpoch = masterEpoch;
+ }
+
+ public Set<String> getSyncStateSet() {
+ return syncStateSet;
+ }
+
+ public void setSyncStateSet(Set<String> syncStateSet) {
+ this.syncStateSet = syncStateSet;
+ }
+
+ public int getSyncStateSetEpoch() {
+ return syncStateSetEpoch;
+ }
+
+ public void setSyncStateSetEpoch(int syncStateSetEpoch) {
+ this.syncStateSetEpoch = syncStateSetEpoch;
+ }
+
+ @Override
+ public String toString() {
+ return "GetReplicaInfoResponseHeader{" +
+ "masterAddress='" + masterAddress + '\'' +
+ ", masterEpoch=" + masterEpoch +
+ ", syncStateSet=" + syncStateSet +
+ ", syncStateSetEpoch=" + syncStateSetEpoch +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerRequestHeader.java
new file mode 100644
index 000000000..8c9439932
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerRequestHeader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class RegisterBrokerRequestHeader implements CommandCustomHeader {
+ private final String clusterName;
+ private final String brokerName;
+ private final String brokerAddress;
+
+ public RegisterBrokerRequestHeader(String clusterName, String brokerName, String brokerAddress) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerAddress = brokerAddress;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public String getBrokerAddress() {
+ return brokerAddress;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisterBrokerRequestHeader{" +
+ "clusterName='" + clusterName + '\'' +
+ ", brokerName='" + brokerName + '\'' +
+ ", brokerAddress='" + brokerAddress + '\'' +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerResponseHeader.java
new file mode 100644
index 000000000..97fbb1057
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerResponseHeader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class RegisterBrokerResponseHeader implements CommandCustomHeader {
+ private String masterAddress;
+ private int masterEpoch;
+ // The id of this registered replicas.
+ private long brokerId;
+
+ public RegisterBrokerResponseHeader() {
+ }
+
+ public String getMasterAddress() {
+ return masterAddress;
+ }
+
+ public void setMasterAddress(String masterAddress) {
+ this.masterAddress = masterAddress;
+ }
+
+ public int getMasterEpoch() {
+ return masterEpoch;
+ }
+
+ public void setMasterEpoch(int masterEpoch) {
+ this.masterEpoch = masterEpoch;
+ }
+
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisterBrokerResponseHeader{" +
+ "masterAddress='" + masterAddress + '\'' +
+ ", masterEpoch=" + masterEpoch +
+ ", brokerId=" + brokerId +
+ '}';
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java b/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java
new file mode 100644
index 000000000..805558d1d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.support.config.FastJsonConfig;
+import org.apache.commons.lang3.SerializationException;
+
+/**
+ * The object serializer based on fastJson
+ */
+public class FastJsonSerializer implements Serializer {
+ private FastJsonConfig fastJsonConfig = new FastJsonConfig();
+
+ public FastJsonConfig getFastJsonConfig() {
+ return this.fastJsonConfig;
+ }
+
+ public void setFastJsonConfig(FastJsonConfig fastJsonConfig) {
+ this.fastJsonConfig = fastJsonConfig;
+ }
+
+ @Override
+ public <T> byte[] serialize(T t) throws SerializationException {
+ if (t == null) {
+ return new byte[0];
+ } else {
+ try {
+ return JSON.toJSONBytesWithFastJsonConfig(this.fastJsonConfig.getCharset(), t, this.fastJsonConfig.getSerializeConfig(), this.fastJsonConfig.getSerializeFilters(), this.fastJsonConfig.getDateFormat(), JSON.DEFAULT_GENERATE_FEATURE, this.fastJsonConfig.getSerializerFeatures());
+ } catch (Exception var3) {
+ throw new SerializationException("Could not serialize: " + var3.getMessage(), var3);
+ }
+ }
+ }
+
+ @Override
+ public <T> T deserialize(byte[] bytes, Class<T> type) throws SerializationException {
+ if (bytes != null && bytes.length != 0) {
+ try {
+ return JSON.parseObject(bytes, this.fastJsonConfig.getCharset(), type, this.fastJsonConfig.getParserConfig(), this.fastJsonConfig.getParseProcess(), JSON.DEFAULT_PARSER_FEATURE, this.fastJsonConfig.getFeatures());
+ } catch (Exception var3) {
+ throw new SerializationException("Could not deserialize: " + var3.getMessage(), var3);
+ }
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/Serializer.java b/common/src/main/java/org/apache/rocketmq/common/utils/Serializer.java
new file mode 100644
index 000000000..a98d2454d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/Serializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import org.apache.commons.lang3.SerializationException;
+
+/**
+ * Serializer
+ */
+public interface Serializer {
+
+ /**
+ * Serialize object t to byte[]
+ */
+ <T> byte[] serialize(T t) throws SerializationException;
+
+ /**
+ * De-serialize bytes to T
+ */
+ <T> T deserialize(byte[] bytes, Class<T> type) throws SerializationException;
+}
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 0b04d5014..83b11d1c5 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -28,6 +28,11 @@
<name>rocketmq-namesrv ${project.version}</name>
<dependencies>
+ <dependency>
+ <groupId>io.openmessaging.storage</groupId>
+ <artifactId>dledger</artifactId>
+ <version>0.2.5</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index d02e6c0ad..e10878e6b 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -30,13 +30,17 @@ import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.namesrv.controller.impl.DledgerController;
import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
import org.apache.rocketmq.namesrv.processor.ClientRequestProcessor;
import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
+import org.apache.rocketmq.namesrv.processor.ControllerRequestProcessor;
import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
@@ -52,7 +56,6 @@ import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.srvutil.FileWatchService;
-
public class NamesrvController {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private static final InternalLogger WATER_MARK_LOG = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_WATER_MARK_LOGGER_NAME);
@@ -61,6 +64,7 @@ public class NamesrvController {
private final NettyServerConfig nettyServerConfig;
private final NettyClientConfig nettyClientConfig;
+ private final ControllerConfig controllerConfig;
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder()
@@ -82,24 +86,33 @@ public class NamesrvController {
private ExecutorService defaultExecutor;
private ExecutorService clientRequestExecutor;
+ private ExecutorService controllerRequestExecutor;
private BlockingQueue<Runnable> defaultThreadPoolQueue;
private BlockingQueue<Runnable> clientRequestThreadPoolQueue;
+ private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
private Configuration configuration;
private FileWatchService fileWatchService;
+ private Controller controller;
+
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
- this(namesrvConfig, nettyServerConfig, new NettyClientConfig());
+ this(namesrvConfig, nettyServerConfig, new NettyClientConfig(), new ControllerConfig());
}
- public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
+ public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig,
+ NettyClientConfig nettyClientConfig, ControllerConfig controllerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
+ this.controllerConfig = controllerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
- this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
+ if (controllerConfig.isStartupController()) {
+ this.controller = new DledgerController(controllerConfig, this);
+ }
+ this.routeInfoManager = new RouteInfoManager(namesrvConfig, this, this.controller);
this.configuration = new Configuration(
LOGGER,
this.namesrvConfig, this.nettyServerConfig
@@ -140,6 +153,24 @@ public class NamesrvController {
return new FutureTaskExt<T>(runnable, value);
}
};
+
+ if (this.controllerConfig.isStartupController()) {
+ this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
+ this.controllerRequestExecutor = new ThreadPoolExecutor(
+ this.controllerConfig.getControllerThreadPoolNums(),
+ this.controllerConfig.getControllerThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.controllerRequestThreadPoolQueue,
+ new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
+ return new FutureTaskExt<T>(runnable, value);
+ }
+ };
+ this.controller.startup();
+ }
+
this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
this.remotingClient.updateNameServerAddressList(Arrays.asList(RemotingUtil.getLocalAddress() + ":" + this.nettyServerConfig.getListenPort()));
@@ -229,7 +260,6 @@ public class NamesrvController {
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
-
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.defaultExecutor);
} else {
@@ -238,6 +268,15 @@ public class NamesrvController {
this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
+
+ if (controllerConfig.isStartupController()) {
+ final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this.controller);
+ this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
+ this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
+ this.remotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor);
+ this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+ this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+ }
}
}
@@ -274,6 +313,10 @@ public class NamesrvController {
return nettyServerConfig;
}
+ public ControllerConfig getControllerConfig() {
+ return controllerConfig;
+ }
+
public KVConfigManager getKvConfigManager() {
return kvConfigManager;
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index 715f23872..c580a5718 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -32,6 +32,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
@@ -84,6 +85,7 @@ public class NamesrvStartup {
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(9876);
+ final ControllerConfig controllerConfig = new ControllerConfig();
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
@@ -93,6 +95,7 @@ public class NamesrvStartup {
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
+ MixAll.properties2Object(properties, controllerConfig);
namesrvConfig.setConfigStorePath(file);
@@ -105,6 +108,7 @@ public class NamesrvStartup {
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
MixAll.printObjectProperties(null, nettyClientConfig);
+ MixAll.printObjectProperties(null, controllerConfig);
System.exit(0);
}
@@ -126,7 +130,7 @@ public class NamesrvStartup {
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
- final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);
+ final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig, controllerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java
new file mode 100644
index 000000000..4266942b2
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ * The api for controller
+ */
+public interface Controller {
+
+ /**
+ * Startup controller
+ */
+ void startup();
+
+ /**
+ * Shutdown controller
+ */
+ void shutdown();
+
+ /**
+ * Start scheduling controller events, this function only will be triggered when the controller becomes leader.
+ */
+ void startScheduling();
+
+ /**
+ * Stop scheduling controller events, this function only will be triggered when the controller shutdown leaderShip.
+ */
+ void stopScheduling();
+
+ /**
+ * Alter ISR of broker replicas.
+ *
+ * @param request AlterSyncStateSetRequestHeader
+ * @return RemotingCommand(AlterSyncStateSetResponseHeader)
+ */
+ CompletableFuture<RemotingCommand> alterSyncStateSet(
+ final AlterSyncStateSetRequestHeader request);
+
+ /**
+ * Elect new master for a broker.
+ *
+ * @param request ElectMasterRequest
+ * @return RemotingCommand(ElectMasterResponseHeader)
+ */
+ CompletableFuture<RemotingCommand> electMaster(final ElectMasterRequestHeader request);
+
+ /**
+ * Register api when a replicas of a broker startup.
+ *
+ * @param request RegisterBrokerRequest
+ * @return RemotingCommand(RegisterBrokerResponseHeader)
+ */
+ CompletableFuture<RemotingCommand> registerBroker(final RegisterBrokerRequestHeader request);
+
+ /**
+ * Get the Replica Info for a target broker.
+ *
+ * @param request GetRouteInfoRequest
+ * @return RemotingCommand(GetReplicaInfoResponseHeader)
+ */
+ CompletableFuture<RemotingCommand> getReplicaInfo(final GetReplicaInfoRequestHeader request);
+
+ /**
+ * Get Metadata of controller
+ *
+ * @return RemotingCommand(GetControllerMetadataResponseHeader)
+ */
+ RemotingCommand getControllerMetadata();
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java
new file mode 100644
index 000000000..533b8fbcd
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.impl;
+
+import io.openmessaging.storage.dledger.AppendFuture;
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import io.openmessaging.storage.dledger.DLedgerLeaderElector;
+import io.openmessaging.storage.dledger.DLedgerServer;
+import io.openmessaging.storage.dledger.MemberState;
+import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
+import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
+import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.function.Supplier;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ * The implementation of controller, based on dledger (raft).
+ */
+public class DledgerController implements Controller {
+
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+ private final DLedgerServer dLedgerServer;
+ private final ControllerConfig controllerConfig;
+ private final DLedgerConfig dLedgerConfig;
+ private final NamesrvController namesrvController;
+ // Usr for checking whether the broker is alive
+ private final BiPredicate<String, String> brokerAlivePredicate;
+
+ private final ReplicasInfoManager replicasInfoManager;
+ private final EventScheduler scheduler;
+ private final EventSerializer eventSerializer;
+ private final RoleChangeHandler roleHandler;
+ private final DledgerControllerStateMachine statemachine;
+ private volatile boolean isScheduling = false;
+
+ public DledgerController(final ControllerConfig config, final NamesrvController namesrvController) {
+ this.controllerConfig = config;
+ this.eventSerializer = new EventSerializer();
+ this.scheduler = new EventScheduler();
+ this.namesrvController = namesrvController;
+ if (namesrvController == null) {
+ this.brokerAlivePredicate = (cluster, address) -> true;
+ } else {
+ this.brokerAlivePredicate = (cluster, address) -> namesrvController.getRouteInfoManager().isBrokerAlive(cluster, address);
+ }
+
+ this.dLedgerConfig = new DLedgerConfig();
+ this.dLedgerConfig.setGroup(config.getControllerDLegerGroup());
+ this.dLedgerConfig.setPeers(config.getControllerDLegerPeers());
+ this.dLedgerConfig.setSelfId(config.getControllerDLegerSelfId());
+ this.dLedgerConfig.setStoreBaseDir(config.getControllerStorePath());
+ this.dLedgerConfig.setMappedFileSizeForEntryData(config.getMappedFileSize());
+
+ this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
+ this.replicasInfoManager = new ReplicasInfoManager(config);
+ this.statemachine = new DledgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getSelfId());
+
+ // Register statemachine and role handler.
+ this.dLedgerServer = new DLedgerServer(dLedgerConfig);
+ this.dLedgerServer.registerStateMachine(this.statemachine);
+ this.dLedgerServer.getdLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
+ }
+
+ @Override
+ public void startup() {
+ this.dLedgerServer.startup();
+ }
+
+ @Override
+ public void shutdown() {
+ this.dLedgerServer.shutdown();
+ }
+
+ @Override
+ public void startScheduling() {
+ if (!this.isScheduling) {
+ log.info("Start scheduling controller events");
+ this.isScheduling = true;
+ this.scheduler.start();
+ }
+ }
+
+ @Override
+ public void stopScheduling() {
+ if (this.isScheduling) {
+ log.info("Stop scheduling controller events");
+ this.isScheduling = false;
+ this.scheduler.shutdown(true);
+ }
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> alterSyncStateSet(AlterSyncStateSetRequestHeader request) {
+ return this.scheduler.appendEvent("alterSyncStateSet",
+ () -> this.replicasInfoManager.alterSyncStateSet(request, this.brokerAlivePredicate), true);
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> electMaster(final ElectMasterRequestHeader request) {
+ return this.scheduler.appendEvent("electMaster",
+ () -> this.replicasInfoManager.electMaster(request, this.brokerAlivePredicate), true);
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> registerBroker(RegisterBrokerRequestHeader request) {
+ return this.scheduler.appendEvent("registerBroker",
+ () -> this.replicasInfoManager.registerBroker(request), true);
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> getReplicaInfo(final GetReplicaInfoRequestHeader request) {
+ return this.scheduler.appendEvent("getReplicaInfo",
+ () -> this.replicasInfoManager.getReplicaInfo(request), false);
+ }
+
+ @Override
+ public RemotingCommand getControllerMetadata() {
+ final MemberState state = getMemberState();
+ return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new GetMetaDataResponseHeader(state.getLeaderId(), state.getLeaderAddr()));
+ }
+
+ /**
+ * Event handler that handle event
+ */
+ interface EventHandler<T> {
+ /**
+ * Run the controller event
+ */
+ void run() throws Throwable;
+
+ /**
+ * Return the completableFuture
+ */
+ CompletableFuture<RemotingCommand> future();
+
+ /**
+ * Handle Exception.
+ */
+ void handleException(final Throwable t);
+ }
+
+ /**
+ * Event scheduler, schedule event handler from event queue
+ */
+ class EventScheduler extends ServiceThread {
+ private final BlockingQueue<EventHandler> eventQueue;
+
+ public EventScheduler() {
+ this.eventQueue = new LinkedBlockingQueue<>(1024);
+ }
+
+ @Override
+ public String getServiceName() {
+ return EventScheduler.class.getName();
+ }
+
+ @Override
+ public void run() {
+ log.info("Start event scheduler.");
+ while (!isStopped()) {
+ EventHandler handler;
+ try {
+ handler = this.eventQueue.poll(5, TimeUnit.SECONDS);
+ } catch (final InterruptedException e) {
+ continue;
+ }
+ try {
+ if (handler != null) {
+ handler.run();
+ }
+ } catch (final Throwable e) {
+ handler.handleException(e);
+ }
+ }
+ }
+
+ public <T> CompletableFuture<RemotingCommand> appendEvent(final String name,
+ final Supplier<ControllerResult<T>> supplier, boolean isWriteEvent) {
+ if (isStopped() || !DledgerController.this.roleHandler.isLeaderState()) {
+ final RemotingCommand command = RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_NOT_LEADER, "The controller is not in leader state");
+ final CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ future.complete(command);
+ return future;
+ }
+
+ final EventHandler<T> event = new ControllerEventHandler<>(name, supplier, isWriteEvent);
+ int tryTimes = 0;
+ while (true) {
+ try {
+ if (!this.eventQueue.offer(event, 5, TimeUnit.SECONDS)) {
+ continue;
+ }
+ return event.future();
+ } catch (final InterruptedException e) {
+ log.error("Error happen in EventScheduler when append event", e);
+ tryTimes++;
+ if (tryTimes > 3) {
+ return null;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Event handler, get events from supplier, and append events to dledger
+ */
+ class ControllerEventHandler<T> implements EventHandler<T> {
+ private final String name;
+ private final Supplier<ControllerResult<T>> supplier;
+ private final CompletableFuture<RemotingCommand> future;
+ private final boolean isWriteEvent;
+
+ ControllerEventHandler(final String name, final Supplier<ControllerResult<T>> supplier,
+ final boolean isWriteEvent) {
+ this.name = name;
+ this.supplier = supplier;
+ this.future = new CompletableFuture<>();
+ this.isWriteEvent = isWriteEvent;
+ }
+
+ @Override
+ public void run() throws Throwable {
+ final ControllerResult<T> result = this.supplier.get();
+ log.info("Event queue run event {}, get the result {}", this.name, result);
+ boolean appendSuccess = true;
+ if (this.isWriteEvent) {
+ final List<EventMessage> events = result.getEvents();
+ final List<byte[]> eventBytes = new ArrayList<>(events.size());
+ for (final EventMessage event : events) {
+ if (event != null) {
+ final byte[] data = DledgerController.this.eventSerializer.serialize(event);
+ if (data != null && data.length > 0) {
+ eventBytes.add(data);
+ }
+ }
+ }
+ // Append events to dledger
+ if (!eventBytes.isEmpty()) {
+ final BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+ request.setBatchMsgs(eventBytes);
+ appendSuccess = appendToDledgerAndWait(request);
+ }
+ } else {
+ if (DledgerController.this.controllerConfig.isProcessReadEvent()) {
+ // Now the dledger don't have the function of Read-Index or Lease-Read,
+ // So we still need to propose an empty request to dledger.
+ final AppendEntryRequest request = new AppendEntryRequest();
+ request.setBody(new byte[0]);
+ appendSuccess = appendToDledgerAndWait(request);
+ }
+ }
+ if (appendSuccess) {
+ final RemotingCommand response = RemotingCommand.createResponseCommandWithHeader(result.getResponseCode(), (CommandCustomHeader) result.getResponse());
+ this.future.complete(response);
+ } else {
+ log.error("Failed to append event to dledger, the response is {}, try cancel the future", result.getResponse());
+ this.future.cancel(true);
+ }
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> future() {
+ return this.future;
+ }
+
+ @Override
+ public void handleException(final Throwable t) {
+ log.error("Error happen when handle event {}", this.name, t);
+ this.future.completeExceptionally(t);
+ }
+ }
+
+ /**
+ * Role change handler, trigger the startScheduling() and stopScheduling() when role change.
+ */
+ class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
+
+ private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER;
+ private final String selfId;
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
+
+ public RoleChangeHandler(final String selfId) {
+ this.selfId = selfId;
+ }
+
+ @Override
+ public void handle(long term, MemberState.Role role) {
+ Runnable runnable = () -> {
+ switch (role) {
+ case CANDIDATE:
+ this.currentRole = MemberState.Role.CANDIDATE;
+ log.info("Controller {} change role to candidate", this.selfId);
+ DledgerController.this.stopScheduling();
+ break;
+ case FOLLOWER:
+ this.currentRole = MemberState.Role.FOLLOWER;
+ log.info("Controller {} change role to Follower, leaderId:{}", this.selfId, getMemberState().getLeaderId());
+ DledgerController.this.stopScheduling();
+ break;
+ case LEADER: {
+ this.currentRole = MemberState.Role.LEADER;
+ log.info("Controller {} change role to leader, try process a initial proposal", this.selfId);
+ // Because the role becomes to leader, but the memory statemachine of the controller is still in the old point,
+ // some committed logs have not been applied. Therefore, we must first process an empty request to dledger,
+ // and after the request is committed, the controller can provide services(startScheduling).
+ int tryTimes = 0;
+ while (true) {
+ final AppendEntryRequest request = new AppendEntryRequest();
+ request.setBody(new byte[0]);
+ try {
+ if (appendToDledgerAndWait(request)) {
+ DledgerController.this.startScheduling();
+ break;
+ }
+ } catch (final Throwable e) {
+ log.error("Error happen when controller leader append initial request to dledger", e);
+ tryTimes++;
+ if (tryTimes > 2) {
+ log.warn("Controller leader append initial log failed too many times, please wait a while");
+ tryTimes = 0;
+ }
+ }
+ }
+ break;
+ }
+ }
+ };
+ this.executorService.submit(runnable);
+ }
+
+ @Override
+ public void startup() {
+ }
+
+ @Override
+ public void shutdown() {
+ if (this.currentRole == MemberState.Role.LEADER) {
+ DledgerController.this.stopScheduling();
+ }
+ this.executorService.shutdown();
+ }
+
+ public boolean isLeaderState() {
+ return this.currentRole == MemberState.Role.LEADER;
+ }
+ }
+
+ /**
+ * Append the request to dledger, wait the dledger to commit the request.
+ */
+ private boolean appendToDledgerAndWait(final AppendEntryRequest request) throws Throwable {
+ if (request != null) {
+ request.setGroup(this.dLedgerConfig.getGroup());
+ request.setRemoteId(this.dLedgerConfig.getSelfId());
+
+ final AppendFuture<AppendEntryResponse> dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+ if (dledgerFuture.getPos() == -1) {
+ return false;
+ }
+ dledgerFuture.get(5, TimeUnit.SECONDS);
+ return true;
+ }
+ return false;
+ }
+
+ // Only for test
+ public MemberState getMemberState() {
+ return this.dLedgerServer.getMemberState();
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java
new file mode 100644
index 000000000..2eed8abfb
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.impl;
+
+import io.openmessaging.storage.dledger.entry.DLedgerEntry;
+import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
+import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
+import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator;
+import io.openmessaging.storage.dledger.statemachine.StateMachine;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
+
+/**
+ * The state machine implementation of the dledger controller
+ */
+public class DledgerControllerStateMachine implements StateMachine {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+ private final ReplicasInfoManager replicasInfoManager;
+ private final EventSerializer eventSerializer;
+ private final String dledgerId;
+
+ public DledgerControllerStateMachine(final ReplicasInfoManager replicasInfoManager,
+ final EventSerializer eventSerializer, final String dledgerId) {
+ this.replicasInfoManager = replicasInfoManager;
+ this.eventSerializer = eventSerializer;
+ this.dledgerId = dledgerId;
+ }
+
+ @Override
+ public void onApply(CommittedEntryIterator iterator) {
+ int applyingSize = 0;
+ while (iterator.hasNext()) {
+ final DLedgerEntry entry = iterator.next();
+ final byte[] body = entry.getBody();
+ if (body != null && body.length > 0) {
+ final EventMessage event = this.eventSerializer.deserialize(body);
+ this.replicasInfoManager.applyEvent(event);
+ applyingSize ++;
+ }
+ }
+ log.info("Apply {} events on controller {}", applyingSize, this.dledgerId);
+ }
+
+ @Override
+ public void onSnapshotSave(SnapshotWriter writer, CompletableFuture<Boolean> future) {
+ }
+
+ @Override
+ public boolean onSnapshotLoad(SnapshotReader reader) {
+ return false;
+ }
+
+ @Override
+ public void onShutdown() {
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerIdInfo.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerIdInfo.java
new file mode 100644
index 000000000..e060c13ee
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerIdInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Broker id info, mapping from brokerAddress to brokerId.
+ */
+public class BrokerIdInfo {
+ private final String clusterName;
+ private final String brokerName;
+ // Start from 1
+ private final AtomicLong brokerIdCount;
+ private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable;
+
+ public BrokerIdInfo(String clusterName, String brokerName) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerIdCount = new AtomicLong(1L);
+ this.brokerIdTable = new HashMap<>();
+ }
+
+ public long newBrokerId() {
+ return this.brokerIdCount.incrementAndGet();
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public HashMap<String, Long> getBrokerIdTable() {
+ return brokerIdTable;
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java
new file mode 100644
index 000000000..8b8c5afd4
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * In sync replicas info, manages the master and syncStateSet of a broker.
+ */
+public class InSyncReplicasInfo {
+ private final String clusterName;
+ private final String brokerName;
+
+ private Set<String/*Address*/> syncStateSet;
+ private int syncStateSetEpoch;
+
+ private String masterAddress;
+ private int masterEpoch;
+
+ public InSyncReplicasInfo(String clusterName, String brokerName, String masterAddress) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.masterAddress = masterAddress;
+ this.masterEpoch = 1;
+ this.syncStateSet = new HashSet<>();
+ this.syncStateSet.add(masterAddress);
+ this.syncStateSetEpoch = 1;
+ }
+
+ public void updateMasterInfo(String masterAddress) {
+ this.masterAddress = masterAddress;
+ this.masterEpoch++;
+ }
+
+ public void updateSyncStateSetInfo(Set<String> newSyncStateSet) {
+ this.syncStateSet = new HashSet<>(newSyncStateSet);
+ this.syncStateSetEpoch++;
+ }
+
+ public boolean isMasterExist() {
+ return !this.masterAddress.isEmpty();
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public Set<String> getSyncStateSet() {
+ return new HashSet<>(syncStateSet);
+ }
+
+ public int getSyncStateSetEpoch() {
+ return syncStateSetEpoch;
+ }
+
+ public String getMasterAddress() {
+ return masterAddress;
+ }
+
+ public int getMasterEpoch() {
+ return masterEpoch;
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java
new file mode 100644
index 000000000..622a765b5
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+ private final boolean enableElectUncleanMaster;
+ private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+ private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+ public ReplicasInfoManager(final ControllerConfig config) {
+ this.enableElectUncleanMaster = config.isEnableElectUncleanMaster();
+ this.replicaInfoTable = new HashMap<>();
+ this.inSyncReplicasInfoTable = new HashMap<>();
+ }
+
+ public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+ final AlterSyncStateSetRequestHeader request, final BiPredicate<String, String> brokerAlivePredicate) {
+ final String brokerName = request.getBrokerName();
+ final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader());
+ final AlterSyncStateSetResponseHeader response = result.getResponse();
+
+ if (isContainsBroker(brokerName)) {
+ final Set<String> newSyncStateSet = request.getNewSyncStateSet();
+ final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+ final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+ // Check master
+ if (!replicasInfo.getMasterAddress().equals(request.getMasterAddress())) {
+ log.info("Rejecting alter syncStateSet request because the current leader is:{}, not {}",
+ replicasInfo.getMasterAddress(), request.getMasterAddress());
+ result.setResponseCode(ResponseCode.CONTROLLER_INVALID_MASTER);
+ return result;
+ }
+
+ // Check master epoch
+ if (request.getMasterEpoch() != replicasInfo.getMasterEpoch()) {
+ log.info("Rejecting alter syncStateSet request because the current master epoch is:{}, not {}",
+ replicasInfo.getMasterEpoch(), request.getMasterEpoch());
+ result.setResponseCode(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH);
+ return result;
+ }
+
+ // Check syncStateSet epoch
+ if (request.getSyncStateSetEpoch() != replicasInfo.getSyncStateSetEpoch()) {
+ log.info("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{}, not {}",
+ replicasInfo.getSyncStateSetEpoch(), request.getSyncStateSetEpoch());
+ result.setResponseCode(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH);
+ return result;
+ }
+
+ // Check newSyncStateSet correctness
+ for (String replicas : newSyncStateSet) {
+ if (!brokerIdTable.containsKey(replicas)) {
+ log.info("Rejecting alter syncStateSet request because the replicas {} don't exist", replicas);
+ result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REPLICAS);
+ return result;
+ }
+ if (!brokerAlivePredicate.test(brokerInfo.getClusterName(), replicas)) {
+ log.info("Rejecting alter syncStateSet request because the replicas {} don't alive", replicas);
+ result.setResponseCode(ResponseCode.CONTROLLER_BROKER_NOT_ALIVE);
+ return result;
+ }
+ }
+
+ if (!newSyncStateSet.contains(replicasInfo.getMasterAddress())) {
+ log.info("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {}", replicasInfo.getMasterAddress());
+ result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+ return result;
+ }
+
+ // Generate event
+ response.setNewSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch() + 1);
+ response.setNewSyncStateSet(newSyncStateSet);
+ final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+ result.addEvent(event);
+ return result;
+ }
+ result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+ return result;
+ }
+
+ public ControllerResult<ElectMasterResponseHeader> electMaster(
+ final ElectMasterRequestHeader request, final BiPredicate<String, String> brokerAlivePredicate) {
+ final String brokerName = request.getBrokerName();
+ final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
+ if (isContainsBroker(brokerName)) {
+ final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+ final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final Set<String> syncStateSet = replicasInfo.getSyncStateSet();
+ // Try elect a master in syncStateSet
+ if (syncStateSet.size() > 1) {
+ boolean electSuccess = tryElectMaster(result, brokerName, syncStateSet, (candidate) ->
+ !candidate.equals(replicasInfo.getMasterAddress()) && brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
+ if (electSuccess) {
+ return result;
+ }
+ }
+
+ // Try elect a master in lagging replicas if enableElectUncleanMaster = true
+ if (enableElectUncleanMaster) {
+ final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+ boolean electSuccess = tryElectMaster(result, brokerName, brokerIdTable.keySet(), (candidate) ->
+ !candidate.equals(replicasInfo.getMasterAddress()) && brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
+ if (electSuccess) {
+ return result;
+ }
+ }
+
+ // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
+ // that the master was shutdown and no new master was elected.
+ final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+ result.addEvent(event);
+ result.setResponseCode(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
+ return result;
+ }
+ result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+ return result;
+ }
+
+ /**
+ * Try elect a new master in candidates
+ *
+ * @param filter return true if the candidate is available
+ * @return true if elect success
+ */
+ private boolean tryElectMaster(final ControllerResult<ElectMasterResponseHeader> result, final String brokerName,
+ final Set<String> candidates, final Predicate<String> filter) {
+ final int masterEpoch = this.inSyncReplicasInfoTable.get(brokerName).getMasterEpoch();
+ for (final String candidate : candidates) {
+ if (filter.test(candidate)) {
+ final ElectMasterResponseHeader response = result.getResponse();
+ response.setNewMasterAddress(candidate);
+ response.setMasterEpoch(masterEpoch + 1);
+
+ final ElectMasterEvent event = new ElectMasterEvent(brokerName, candidate);
+ result.addEvent(event);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public ControllerResult<RegisterBrokerResponseHeader> registerBroker(final RegisterBrokerRequestHeader request) {
+ final String brokerName = request.getBrokerName();
+ final String brokerAddress = request.getBrokerAddress();
+ final ControllerResult<RegisterBrokerResponseHeader> result = new ControllerResult<>(new RegisterBrokerResponseHeader());
+ final RegisterBrokerResponseHeader response = result.getResponse();
+ boolean canBeElectedAsMaster;
+ if (isContainsBroker(brokerName)) {
+ final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+ final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+ // Get brokerId.
+ long brokerId;
+ if (!brokerIdTable.containsKey(brokerAddress)) {
+ // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+ brokerId = brokerInfo.newBrokerId();
+ final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getBrokerName(),
+ brokerAddress, brokerId);
+ result.addEvent(applyIdEvent);
+ } else {
+ brokerId = brokerIdTable.get(brokerAddress);
+ }
+ response.setBrokerId(brokerId);
+ response.setMasterEpoch(replicasInfo.getMasterEpoch());
+
+ if (replicasInfo.isMasterExist()) {
+ // If the master is alive, just return master info.
+ response.setMasterAddress(replicasInfo.getMasterAddress());
+ return result;
+ } else {
+ // If the master is not alive, we should elect a new master:
+ // Case1: This replicas was in sync state set list
+ // Case2: The option {EnableElectUncleanMaster} is true
+ canBeElectedAsMaster = replicasInfo.getSyncStateSet().contains(brokerAddress) || this.enableElectUncleanMaster;
+ }
+ } else {
+ // If the broker's metadata does not exist in the state machine, the replicas can be elected as master directly.
+ canBeElectedAsMaster = true;
+ }
+
+ if (canBeElectedAsMaster) {
+ int masterEpoch = this.inSyncReplicasInfoTable.containsKey(brokerName) ?
+ this.inSyncReplicasInfoTable.get(brokerName).getMasterEpoch() + 1 : 1;
+ response.setMasterAddress(request.getBrokerAddress());
+ response.setMasterEpoch(masterEpoch);
+ response.setBrokerId(0);
+
+ final ElectMasterEvent event = new ElectMasterEvent(true, brokerName, brokerAddress, request.getClusterName());
+ result.addEvent(event);
+ return result;
+ }
+
+ response.setMasterAddress("");
+ result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+ return result;
+ }
+
+ public ControllerResult<GetReplicaInfoResponseHeader> getReplicaInfo(final GetReplicaInfoRequestHeader request) {
+ final String brokerName = request.getBrokerName();
+ final ControllerResult<GetReplicaInfoResponseHeader> result = new ControllerResult<>(new GetReplicaInfoResponseHeader());
+ final GetReplicaInfoResponseHeader response = result.getResponse();
+ if (isContainsBroker(brokerName)) {
+ // If exist broker metadata, just return metadata
+ final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+ response.setMasterAddress(replicasInfo.getMasterAddress());
+ response.setMasterEpoch(replicasInfo.getMasterEpoch());
+ response.setSyncStateSet(replicasInfo.getSyncStateSet());
+ response.setSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch());
+ return result;
+ }
+ result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+ return result;
+ }
+
+ /**
+ * Apply events to memory statemachine.
+ *
+ * @param event event message
+ */
+ public void applyEvent(final EventMessage event) {
+ final EventType type = event.getEventType();
+ switch (type) {
+ case ALTER_SYNC_STATE_SET_EVENT:
+ handleAlterSyncStateSet((AlterSyncStateSetEvent) event);
+ break;
+ case APPLY_BROKER_ID_EVENT:
+ handleApplyBrokerId((ApplyBrokerIdEvent) event);
+ break;
+ case ELECT_MASTER_EVENT:
+ handleElectMaster((ElectMasterEvent) event);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void handleAlterSyncStateSet(final AlterSyncStateSetEvent event) {
+ final String brokerName = event.getBrokerName();
+ if (isContainsBroker(brokerName)) {
+ final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+ replicasInfo.updateSyncStateSetInfo(event.getNewSyncStateSet());
+ }
+ }
+
+ private void handleApplyBrokerId(final ApplyBrokerIdEvent event) {
+ final String brokerName = event.getBrokerName();
+ if (isContainsBroker(brokerName)) {
+ final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+ if (!brokerIdTable.containsKey(event.getBrokerAddress())) {
+ brokerIdTable.put(event.getBrokerAddress(), event.getNewBrokerId());
+ }
+ }
+ }
+
+ private void handleElectMaster(final ElectMasterEvent event) {
+ final String brokerName = event.getBrokerName();
+ final String newMaster = event.getNewMasterAddress();
+ if (isContainsBroker(brokerName)) {
+ final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+
+ if (event.getNewMasterElected()) {
+ // Record new master
+ replicasInfo.updateMasterInfo(newMaster);
+
+ // Record new newSyncStateSet list
+ final HashSet<String> newSyncStateSet = new HashSet<>();
+ newSyncStateSet.add(newMaster);
+ replicasInfo.updateSyncStateSetInfo(newSyncStateSet);
+ } else {
+ // If new master was not elected, which means old master was shutdown and the newSyncStateSet list had no more replicas
+ // So we should delete old master, but retain newSyncStateSet list.
+ replicasInfo.updateMasterInfo("");
+ }
+ } else {
+ // When the first replicas of a broker come online,
+ // we can create memory meta information for the broker, and regard it as master
+ final String clusterName = event.getClusterName();
+ final BrokerIdInfo brokerInfo = new BrokerIdInfo(clusterName, brokerName);
+ final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+ final InSyncReplicasInfo replicasInfo = new InSyncReplicasInfo(clusterName, brokerName, newMaster);
+ brokerIdTable.put(newMaster, 1L);
+ this.inSyncReplicasInfoTable.put(brokerName, replicasInfo);
+ this.replicaInfoTable.put(brokerName, brokerInfo);
+ }
+ }
+
+ /**
+ * Is the broker existed in the memory metadata
+ *
+ * @return true if both existed in replicaInfoTable and inSyncReplicasInfoTable
+ */
+ private boolean isContainsBroker(final String brokerName) {
+ return this.replicaInfoTable.containsKey(brokerName) && this.inSyncReplicasInfoTable.containsKey(brokerName);
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java
new file mode 100644
index 000000000..4da71e32b
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The event alters the syncStateSet of target broker.
+ * Triggered by the AlterSyncStateSetApi.
+ */
+public class AlterSyncStateSetEvent implements EventMessage {
+
+ private final String brokerName;
+ private final Set<String/*Address*/> newSyncStateSet;
+
+ public AlterSyncStateSetEvent(String brokerName, Set<String> newSyncStateSet) {
+ this.brokerName = brokerName;
+ this.newSyncStateSet = new HashSet<>(newSyncStateSet);
+ }
+
+ @Override
+ public EventType getEventType() {
+ return EventType.ALTER_SYNC_STATE_SET_EVENT;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public Set<String> getNewSyncStateSet() {
+ return new HashSet<>(newSyncStateSet);
+ }
+
+ @Override public String toString() {
+ return "AlterSyncStateSetEvent{" +
+ "brokerName='" + brokerName + '\'' +
+ ", newSyncStateSet=" + newSyncStateSet +
+ '}';
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java
new file mode 100644
index 000000000..fc9860dbe
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+/**
+ * The event trys to apply a new id for a new broker.
+ * Triggered by the RegisterBrokerApi.
+ */
+public class ApplyBrokerIdEvent implements EventMessage {
+ private final String brokerName;
+ private final String brokerAddress;
+ private final long newBrokerId;
+
+ public ApplyBrokerIdEvent(String brokerName, String brokerAddress, long newBrokerId) {
+ this.brokerName = brokerName;
+ this.brokerAddress = brokerAddress;
+ this.newBrokerId = newBrokerId;
+ }
+
+ @Override
+ public EventType getEventType() {
+ return EventType.APPLY_BROKER_ID_EVENT;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public String getBrokerAddress() {
+ return brokerAddress;
+ }
+
+ public long getNewBrokerId() {
+ return newBrokerId;
+ }
+
+ @Override public String toString() {
+ return "ApplyBrokerIdEvent{" +
+ "brokerName='" + brokerName + '\'' +
+ ", brokerAddress='" + brokerAddress + '\'' +
+ ", newBrokerId=" + newBrokerId +
+ '}';
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java
new file mode 100644
index 000000000..9d5915260
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+
+public class ControllerResult<T> {
+ private final List<EventMessage> events;
+ private final T response;
+ private int responseCode = ResponseCode.SUCCESS;
+
+ public ControllerResult(T response) {
+ this.events = new ArrayList<>();
+ this.response = response;
+ }
+
+ public ControllerResult(List<EventMessage> events, T response) {
+ this.events = new ArrayList<>(events);
+ this.response = response;
+ }
+
+ public List<EventMessage> getEvents() {
+ return new ArrayList<>(events);
+ }
+
+ public T getResponse() {
+ return response;
+ }
+
+ public void setResponseCode(int responseCode) {
+ this.responseCode = responseCode;
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+ public static <T> ControllerResult<T> of(List<EventMessage> events, T response) {
+ return new ControllerResult<>(events, response);
+ }
+
+ public void addEvent(EventMessage event) {
+ this.events.add(event);
+ }
+
+ @Override public String toString() {
+ return "ControllerResult{" +
+ "events=" + events +
+ ", response=" + response +
+ '}';
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java
new file mode 100644
index 000000000..69d71a47b
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+/**
+ * The event trys to elect a new master for target broker.
+ * Triggered by the ElectMasterApi.
+ */
+public class ElectMasterEvent implements EventMessage {
+ // Mark whether a new master was elected.
+ private final boolean newMasterElected;
+ private final String brokerName;
+ private final String newMasterAddress;
+ private final String clusterName;
+
+ public ElectMasterEvent(boolean newMasterElected, String brokerName) {
+ this(newMasterElected, brokerName, "", "");
+ }
+
+ public ElectMasterEvent(String brokerName, String newMasterAddress) {
+ this(true, brokerName, newMasterAddress, "");
+ }
+
+ public ElectMasterEvent(boolean newMasterElected, String brokerName, String newMasterAddress,
+ String clusterName) {
+ this.newMasterElected = newMasterElected;
+ this.brokerName = brokerName;
+ this.newMasterAddress = newMasterAddress;
+ this.clusterName = clusterName;
+ }
+
+ @Override
+ public EventType getEventType() {
+ return EventType.ELECT_MASTER_EVENT;
+ }
+
+ public boolean getNewMasterElected() {
+ return newMasterElected;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public String getNewMasterAddress() {
+ return newMasterAddress;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ @Override public String toString() {
+ return "ElectMasterEvent{" +
+ "isNewMasterElected=" + newMasterElected +
+ ", brokerName='" + brokerName + '\'' +
+ ", newMasterAddress='" + newMasterAddress + '\'' +
+ ", clusterName='" + clusterName + '\'' +
+ '}';
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java
new file mode 100644
index 000000000..768fc0dd6
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+/**
+ * The parent class of Event, the subclass needs to indicate eventType.
+ */
+public interface EventMessage {
+
+ /**
+ * Returns the event type of this message
+ */
+ EventType getEventType();
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java
new file mode 100644
index 000000000..66b956a6a
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.rocketmq.common.utils.FastJsonSerializer;
+
+/**
+ * EventMessage serializer
+ */
+public class EventSerializer {
+ private final FastJsonSerializer serializer;
+
+ public EventSerializer() {
+ this.serializer = new FastJsonSerializer();
+ }
+
+ private void putShort(byte[] memory, int index, int value) {
+ memory[index] = (byte) (value >>> 8);
+ memory[index + 1] = (byte) value;
+ }
+
+ private short getShort(byte[] memory, int index) {
+ return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
+ }
+
+ public byte[] serialize(EventMessage message) throws SerializationException {
+ final short eventType = message.getEventType().getId();
+ final byte[] data = this.serializer.serialize(message);
+ if (data != null && data.length > 0) {
+ final byte[] result = new byte[2 + data.length];
+ putShort(result, 0, eventType);
+ System.arraycopy(data, 0, result, 2, data.length);
+ return result;
+ }
+ return null;
+ }
+
+ public EventMessage deserialize(byte[] bytes) throws SerializationException {
+ if (bytes.length < 2) {
+ return null;
+ }
+ final short eventId = getShort(bytes, 0);
+ if (eventId > 0) {
+ final byte[] data = new byte[bytes.length - 2];
+ System.arraycopy(bytes, 2, data, 0, data.length);
+ final EventType eventType = EventType.from(eventId);
+ if (eventType != null) {
+ switch (eventType) {
+ case ALTER_SYNC_STATE_SET_EVENT:
+ return this.serializer.deserialize(data, AlterSyncStateSetEvent.class);
+ case APPLY_BROKER_ID_EVENT:
+ return this.serializer.deserialize(data, ApplyBrokerIdEvent.class);
+ case ELECT_MASTER_EVENT:
+ return this.serializer.deserialize(data, ElectMasterEvent.class);
+ default:
+ break;
+ }
+ }
+ }
+ return null;
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java
new file mode 100644
index 000000000..9496d0a3e
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+/**
+ * Event type (name, id);
+ */
+public enum EventType {
+ ALTER_SYNC_STATE_SET_EVENT("AlterSyncStateSetEvent", (short) 1),
+ APPLY_BROKER_ID_EVENT("ApplyBrokerIdEvent", (short) 2),
+ ELECT_MASTER_EVENT("ElectMasterEvent", (short) 3),
+ READ_EVENT("ReadEvent", (short) 4);
+
+ private final String name;
+ private final short id;
+
+ EventType(String name, short id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ public static EventType from(short id) {
+ switch (id) {
+ case 1:
+ return ALTER_SYNC_STATE_SET_EVENT;
+ case 2:
+ return APPLY_BROKER_ID_EVENT;
+ case 3:
+ return ELECT_MASTER_EVENT;
+ case 4:
+ return READ_EVENT;
+ }
+ return null;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public short getId() {
+ return id;
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
new file mode 100644
index 000000000..d704eb286
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
+
+/**
+ * Processor for controller request
+ */
+public class ControllerRequestProcessor implements NettyRequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+ private static final int WAIT_TIMEOUT_OUT = 5;
+ private final Controller controller;
+
+ public ControllerRequestProcessor(final Controller controller) {
+ this.controller = controller;
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ if (ctx != null) {
+ log.debug("Receive request, {} {} {}",
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
+ }
+ switch (request.getCode()) {
+ case CONTROLLER_ALTER_SYNC_STATE_SET: {
+ final AlterSyncStateSetRequestHeader controllerRequest = request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future = this.controller.alterSyncStateSet(controllerRequest);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ break;
+ }
+ case CONTROLLER_ELECT_MASTER: {
+ final ElectMasterRequestHeader controllerRequest = request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future = this.controller.electMaster(controllerRequest);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ break;
+ }
+ case CONTROLLER_REGISTER_BROKER: {
+ final RegisterBrokerRequestHeader controllerRequest = request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future = this.controller.registerBroker(controllerRequest);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ break;
+ }
+ case CONTROLLER_GET_REPLICA_INFO: {
+ final GetReplicaInfoRequestHeader controllerRequest = request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future = this.controller.getReplicaInfo(controllerRequest);
+ if (future != null) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ break;
+ }
+ case CONTROLLER_GET_METADATA_INFO: {
+ return this.controller.getControllerMetadata();
+ }
+ default: {
+ final String error = " request type " + request.getCode() + " not supported";
+ return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
+ }
+ }
+ return RemotingCommand.createResponseCommand(null);
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index f64640a26..dd206bbb4 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -34,27 +34,29 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
-import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
-import org.apache.rocketmq.common.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.namesrv.controller.Controller;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -80,6 +82,12 @@ public class RouteInfoManager {
private final NamesrvController namesrvController;
private final NamesrvConfig namesrvConfig;
+ private Controller controller;
+
+ public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController, final Controller controller) {
+ this(namesrvConfig, namesrvController);
+ this.controller = controller;
+ }
public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController) {
this.topicQueueTable = new ConcurrentHashMap<String, Map<String, QueueData>>(1024);
@@ -663,6 +671,15 @@ public class RouteInfoManager {
} else {
reducedBroker.add(brokerName);
}
+
+ // Check whether we need to elect a new master
+ if (this.namesrvController != null && this.namesrvController.getControllerConfig().isStartupController() && this.controller != null) {
+ if (unRegisterRequest.getBrokerId() == 0) {
+ this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
+ // Todo: Inform the master
+ // However, because now the broker does not have the related api, so I will complete the process in the future.
+ }
+ }
}
Set<String> changedTopics = cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
@@ -675,6 +692,7 @@ public class RouteInfoManager {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
+ e.printStackTrace();
log.error("unregisterBroker Exception", e);
}
}
@@ -1144,6 +1162,19 @@ public class RouteInfoManager {
return topicList;
}
+
+ /**
+ * @return true if the broker{brokerAddress} is alive
+ */
+ public boolean isBrokerAlive(final String clusterName, final String brokerAddress) {
+ final BrokerLiveInfo info = this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, brokerAddress));
+ if (info != null) {
+ long last = info.getLastUpdateTimestamp();
+ long timeoutMillis = info.getHeartbeatTimeoutMillis();
+ return (last + timeoutMillis) >= System.currentTimeMillis();
+ }
+ return false;
+ }
}
/**
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java
new file mode 100644
index 000000000..627e982b6
--- /dev/null
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.impl;
+
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author hzh
+ * @email 642256541@qq.com
+ * @date 2022/4/20 11:05
+ */
+public class DledgerControllerTest {
+ private List<String> baseDirs;
+ private List<DledgerController> controllers;
+
+ public DledgerController launchController(final String group, final String peers, final String selfId, String storeType, final boolean isEnableElectUncleanMaster) {
+ final String path = "/tmp" + File.separator + group + File.separator + selfId;
+ baseDirs.add(path);
+
+ final ControllerConfig config = new ControllerConfig();
+ config.setControllerDLegerGroup(group);
+ config.setControllerDLegerPeers(peers);
+ config.setControllerDLegerSelfId(selfId);
+ config.setControllerStorePath(path);
+ config.setMappedFileSize(10 * 1024 * 1024);
+ config.setEnableElectUncleanMaster(isEnableElectUncleanMaster);
+
+ final DledgerController controller = new DledgerController(config, null);
+
+ controller.startup();
+ return controller;
+ }
+
+ @Before
+ public void startup() {
+ this.baseDirs = new ArrayList<>();
+ this.controllers = new ArrayList<>();
+ }
+
+ @After
+ public void tearDown() {
+ for (Controller controller : this.controllers) {
+ controller.shutdown();
+ }
+ for (String dir : this.baseDirs) {
+ System.out.println("Delete file " + dir);
+ new File(dir).delete();
+ }
+ }
+
+ public boolean registerNewBroker(Controller leader, String clusterName, String brokerName, String brokerAddress,
+ boolean isFirstRegisteredBroker) throws Exception {
+ // Register new broker
+ final RegisterBrokerRequestHeader registerRequest =
+ new RegisterBrokerRequestHeader(clusterName, brokerName, brokerAddress);
+ final RemotingCommand response = leader.registerBroker(registerRequest).get(10, TimeUnit.SECONDS);
+ final RegisterBrokerResponseHeader registerResult = (RegisterBrokerResponseHeader) response.readCustomHeader();
+ System.out.println("------------- Register broker done, the result is :" + registerResult);
+
+ if (!isFirstRegisteredBroker) {
+ assertTrue(registerResult.getBrokerId() > 0);
+ }
+ return true;
+ }
+
+ private boolean alterNewInSyncSet(Controller leader, String brokerName, String masterAddress, int masterEpoch,
+ Set<String> newSyncStateSet, int syncStateSetEpoch) throws Exception {
+ final AlterSyncStateSetRequestHeader alterRequest =
+ new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch, newSyncStateSet, syncStateSetEpoch);
+ final RemotingCommand response = leader.alterSyncStateSet(alterRequest).get(10, TimeUnit.SECONDS);
+
+ final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)).get(10, TimeUnit.SECONDS);
+ final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader();
+ assertArrayEquals(replicaInfo.getSyncStateSet().toArray(), newSyncStateSet.toArray());
+ assertEquals(replicaInfo.getSyncStateSetEpoch(), syncStateSetEpoch + 1);
+ return true;
+ }
+
+ public DledgerController waitLeader(final List<DledgerController> controllers) throws Exception {
+ if (controllers.isEmpty()) {
+ return null;
+ }
+ DledgerController c1 = controllers.get(0);
+ while (c1.getMemberState().getLeaderId() == null) {
+ Thread.sleep(1000);
+ }
+ String leaderId = c1.getMemberState().getLeaderId();
+ System.out.println("New leader " + leaderId);
+ for (DledgerController controller : controllers) {
+ if (controller.getMemberState().getSelfId().equals(leaderId)) {
+ return controller;
+ }
+ }
+ return null;
+ }
+
+ public DledgerController mockMetaData(boolean enableElectUncleanMaster) throws Exception {
+ String group = UUID.randomUUID().toString();
+ String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002);
+ DledgerController c0 = launchController(group, peers, "n0", DLedgerConfig.MEMORY, enableElectUncleanMaster);
+ DledgerController c1 = launchController(group, peers, "n1", DLedgerConfig.MEMORY, enableElectUncleanMaster);
+ DledgerController c2 = launchController(group, peers, "n2", DLedgerConfig.MEMORY, enableElectUncleanMaster);
+ controllers.add(c0);
+ controllers.add(c1);
+ controllers.add(c2);
+
+ DledgerController leader = waitLeader(controllers);
+
+ assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9000", true));
+ assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9001", true));
+ assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9002", true));
+ final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS);
+ final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader();
+ assertEquals(replicaInfo.getMasterEpoch(), 1);
+ assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
+
+ // Try alter sync state set
+ final HashSet<String> newSyncStateSet = new HashSet<>();
+ newSyncStateSet.add("127.0.0.1:9000");
+ newSyncStateSet.add("127.0.0.1:9001");
+ newSyncStateSet.add("127.0.0.1:9002");
+ assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 1));
+ return leader;
+ }
+
+ @Test
+ public void testElectMaster() throws Exception {
+ final DledgerController leader = mockMetaData(false);
+ final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ final RemotingCommand resp = leader.electMaster(request).get(10, TimeUnit.SECONDS);
+ final ElectMasterResponseHeader response = (ElectMasterResponseHeader) resp.readCustomHeader();
+ assertEquals(response.getMasterEpoch(), 2);
+ assertFalse(response.getNewMasterAddress().isEmpty());
+ assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+ }
+
+ @Test
+ public void testAllReplicasShutdownAndRestartWithUnEnableElectUnCleanMaster() throws Exception {
+ final DledgerController leader = mockMetaData(false);
+ final HashSet<String> newSyncStateSet = new HashSet<>();
+ newSyncStateSet.add("127.0.0.1:9000");
+
+ assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2));
+
+ // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
+ // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
+ final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+ final RemotingCommand resp = leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
+
+ final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).
+ get(10, TimeUnit.SECONDS).readCustomHeader();
+ assertEquals(replicaInfo.getSyncStateSet(), newSyncStateSet);
+ assertEquals(replicaInfo.getMasterAddress(), "");
+ assertEquals(replicaInfo.getMasterEpoch(), 2);
+
+ // Now, we start broker1 - 127.0.0.1:9001, but it was not in syncStateSet, so it will not be elected as master.
+ final RegisterBrokerRequestHeader request1 =
+ new RegisterBrokerRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+ final RegisterBrokerResponseHeader r1 = (RegisterBrokerResponseHeader) leader.registerBroker(request1).get(10, TimeUnit.SECONDS).readCustomHeader();
+ assertEquals(r1.getBrokerId(), 2);
+ assertEquals(r1.getMasterAddress(), "");
+ assertEquals(r1.getMasterEpoch(), 2);
+
+ // Now, we start broker1 - 127.0.0.1:9000, it will be elected as master
+ final RegisterBrokerRequestHeader request2 =
+ new RegisterBrokerRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+ final RegisterBrokerResponseHeader r2 = (RegisterBrokerResponseHeader) leader.registerBroker(request2).get(10, TimeUnit.SECONDS).readCustomHeader();
+ assertEquals(r2.getBrokerId(), 0);
+ assertEquals(r2.getMasterAddress(), "127.0.0.1:9000");
+ assertEquals(r2.getMasterEpoch(), 3);
+ }
+
+ @Test
+ public void testEnableElectUnCleanMaster() throws Exception {
+ final DledgerController leader = mockMetaData(true);
+ final HashSet<String> newSyncStateSet = new HashSet<>();
+ newSyncStateSet.add("127.0.0.1:9000");
+
+ assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2));
+
+ // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
+ // However, event if the syncStateSet in statemachine is {"127.0.0.1:9000"}
+ // the option {enableElectUncleanMaster = true}, so the controller sill can elect a new master
+ final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+ final CompletableFuture<RemotingCommand> future = leader.electMaster(electRequest);
+ future.get(10, TimeUnit.SECONDS);
+
+ final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS).readCustomHeader();
+ final HashSet<String> newSyncStateSet2 = new HashSet<>();
+ newSyncStateSet2.add(replicaInfo.getMasterAddress());
+ assertEquals(replicaInfo.getSyncStateSet(), newSyncStateSet2);
+ assertNotEquals(replicaInfo.getMasterAddress(), "");
+ assertNotEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
+ assertEquals(replicaInfo.getMasterEpoch(), 2);
+ }
+
+ @Test
+ public void testChangeControllerLeader() throws Exception {
+ final DledgerController leader = mockMetaData(false);
+ leader.shutdown();
+ Thread.sleep(2000);
+ this.controllers.remove(leader);
+ // Wait leader again
+ final DledgerController newLeader = waitLeader(this.controllers);
+ assertNotNull(newLeader);
+
+ final GetReplicaInfoResponseHeader response = (GetReplicaInfoResponseHeader) newLeader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS).readCustomHeader();
+ assertEquals(response.getMasterAddress(), "127.0.0.1:9000");
+ assertEquals(response.getMasterEpoch(), 1);
+
+ final HashSet<String> syncStateSet = new HashSet<>();
+ syncStateSet.add("127.0.0.1:9000");
+ syncStateSet.add("127.0.0.1:9001");
+ syncStateSet.add("127.0.0.1:9002");
+ assertEquals(response.getSyncStateSet(), syncStateSet);
+ }
+}
\ No newline at end of file
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java
new file mode 100644
index 000000000..1a2058842
--- /dev/null
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ReplicasInfoManagerTest {
+ private ReplicasInfoManager replicasInfoManager;
+
+ @Before
+ public void init() {
+ final ControllerConfig config = new ControllerConfig();
+ config.setEnableElectUncleanMaster(false);
+ this.replicasInfoManager = new ReplicasInfoManager(config);
+ }
+
+ public boolean registerNewBroker(String clusterName, String brokerName, String brokerAddress, boolean isFirstRegisteredBroker) {
+ // Register new broker
+ final RegisterBrokerRequestHeader registerRequest =
+ new RegisterBrokerRequestHeader(clusterName, brokerName, brokerAddress);
+ final ControllerResult<RegisterBrokerResponseHeader> registerResult = this.replicasInfoManager.registerBroker(registerRequest);
+ apply(registerResult.getEvents());
+
+ if (isFirstRegisteredBroker) {
+ final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
+ final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse();
+ assertEquals(replicaInfo.getMasterAddress(), brokerAddress);
+ assertEquals(replicaInfo.getMasterEpoch(), 1);
+ assertEquals(replicaInfo.getSyncStateSet().size(), 1);
+ } else {
+ final RegisterBrokerResponseHeader response = registerResult.getResponse();
+ assertTrue(response.getBrokerId() > 0);
+ }
+ return true;
+ }
+
+ private boolean alterNewInSyncSet(String brokerName, String masterAddress, int masterEpoch, Set<String> newSyncStateSet, int syncStateSetEpoch) {
+ final AlterSyncStateSetRequestHeader alterRequest =
+ new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch, newSyncStateSet, syncStateSetEpoch);
+ final ControllerResult<AlterSyncStateSetResponseHeader> result = this.replicasInfoManager.alterSyncStateSet(alterRequest, (va1, va2) -> true);
+ apply(result.getEvents());
+
+ final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)).getResponse();
+ assertArrayEquals(replicaInfo.getSyncStateSet().toArray(), newSyncStateSet.toArray());
+ assertEquals(replicaInfo.getSyncStateSetEpoch(), syncStateSetEpoch + 1);
+ return true;
+ }
+
+ private void apply(final List<EventMessage> events) {
+ for (EventMessage event : events) {
+ this.replicasInfoManager.applyEvent(event);
+ }
+ }
+
+
+ public void mockMetaData() {
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", true);
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", false);
+ registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", false);
+ final HashSet<String> newSyncStateSet = new HashSet<>();
+ newSyncStateSet.add("127.0.0.1:9000");
+ newSyncStateSet.add("127.0.0.1:9001");
+ newSyncStateSet.add("127.0.0.1:9002");
+ assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, newSyncStateSet, 1));
+ }
+
+ @Test
+ public void testElectMaster() {
+ mockMetaData();
+ final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, (va1, va2) -> true);
+ final ElectMasterResponseHeader response = cResult.getResponse();
+ assertEquals(response.getMasterEpoch(), 2);
+ assertFalse(response.getNewMasterAddress().isEmpty());
+ assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+ }
+
+ @Test
+ public void testAllReplicasShutdownAndRestart() {
+ mockMetaData();
+ final HashSet<String> newSyncStateSet = new HashSet<>();
+ newSyncStateSet.add("127.0.0.1:9000");
+ assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2));
+
+ // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
+ // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
+ final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+ final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest, (va1, va2) -> true);
+ final List<EventMessage> events = cResult.getEvents();
+ assertEquals(events.size(), 1);
+ final ElectMasterEvent event = (ElectMasterEvent) events.get(0);
+ assertFalse(event.getNewMasterElected());
+
+ apply(cResult.getEvents());
+
+ final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).getResponse();
+ assertEquals(replicaInfo.getMasterAddress(), "");
+ assertEquals(replicaInfo.getMasterEpoch(), 2);
+ }
+
+}
\ No newline at end of file