You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/23 13:41:53 UTC

[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4195: [Summer of Code] Dledger controller

RongtongJin commented on code in PR #4195:
URL: https://github.com/apache/rocketmq/pull/4195#discussion_r856890750


##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java:
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.impl;
+
+import io.openmessaging.storage.dledger.AppendFuture;
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import io.openmessaging.storage.dledger.DLedgerLeaderElector;
+import io.openmessaging.storage.dledger.DLedgerServer;
+import io.openmessaging.storage.dledger.MemberState;
+import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
+import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
+import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
+import org.apache.rocketmq.remoting.common.ServiceThread;
+
+/**
+ * The implementation of controller, based on dledger (raft).
+ */
+public class DledgerController implements Controller {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private final DLedgerServer dLedgerServer;
+    private final DLedgerConfig dLedgerConfig;
+    private final ReplicasInfoManager replicasInfoManager;
+    private final EventScheduler scheduler;
+    private final EventSerializer eventSerializer;
+    private final RoleChangeHandler roleHandler;
+    private final DledgerControllerStateMachine statemachine;
+    private volatile boolean isScheduling = false;
+
+    public DledgerController(final DLedgerConfig dLedgerConfig, final boolean isEnableElectUncleanMaster) {
+        this.dLedgerConfig = dLedgerConfig;
+
+        this.eventSerializer = new EventSerializer();
+
+        this.scheduler = new EventScheduler();
+        this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
+        this.replicasInfoManager = new ReplicasInfoManager(isEnableElectUncleanMaster);
+        this.statemachine = new DledgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getSelfId());
+
+        // Register statemachine and role handler.
+        this.dLedgerServer = new DLedgerServer(dLedgerConfig);
+        this.dLedgerServer.registerStateMachine(this.statemachine);
+        this.dLedgerServer.getdLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
+    }
+
+    @Override
+    public void startup() {
+        this.dLedgerServer.startup();
+    }
+
+    @Override
+    public void shutdown() {
+        this.dLedgerServer.shutdown();
+    }
+
+    @Override
+    public void startScheduling() {
+        if (!this.isScheduling) {
+            log.info("Start scheduling controller events");
+            this.isScheduling = true;
+            this.scheduler.start();
+        }
+    }
+
+    @Override
+    public void stopScheduling() {
+        if (this.isScheduling) {
+            log.info("Stop scheduling controller events");
+            this.isScheduling = false;
+            this.scheduler.shutdown(true);
+        }
+    }

Review Comment:
   建议使用org.apache.rocketmq.common.ServiceThread而非org.apache.rocketmq.remoting.common.ServiceThread,不然EventScheduler在shutdown后将不能再start



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+
+/**
+ * The api for controller
+ */
+public interface Controller {
+
+    interface EventHandler<T> {
+        /**
+         * Run the controller event
+         */
+        void run() throws Throwable;
+
+        /**
+         * Return the completableFuture
+         */
+        CompletableFuture<T> future();
+
+        /**
+         * Handle Exception.
+         */
+        void handleException(final Throwable t);
+    }

Review Comment:
   或者可以另起一个文件写这个接口



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java:
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.impl;
+
+import io.openmessaging.storage.dledger.AppendFuture;
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import io.openmessaging.storage.dledger.DLedgerLeaderElector;
+import io.openmessaging.storage.dledger.DLedgerServer;
+import io.openmessaging.storage.dledger.MemberState;
+import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
+import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
+import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
+import org.apache.rocketmq.remoting.common.ServiceThread;
+
+/**
+ * The implementation of controller, based on dledger (raft).
+ */
+public class DledgerController implements Controller {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private final DLedgerServer dLedgerServer;
+    private final DLedgerConfig dLedgerConfig;
+    private final ReplicasInfoManager replicasInfoManager;
+    private final EventScheduler scheduler;
+    private final EventSerializer eventSerializer;
+    private final RoleChangeHandler roleHandler;
+    private final DledgerControllerStateMachine statemachine;
+    private volatile boolean isScheduling = false;
+
+    public DledgerController(final DLedgerConfig dLedgerConfig, final boolean isEnableElectUncleanMaster) {
+        this.dLedgerConfig = dLedgerConfig;
+
+        this.eventSerializer = new EventSerializer();
+
+        this.scheduler = new EventScheduler();
+        this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
+        this.replicasInfoManager = new ReplicasInfoManager(isEnableElectUncleanMaster);
+        this.statemachine = new DledgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getSelfId());
+
+        // Register statemachine and role handler.
+        this.dLedgerServer = new DLedgerServer(dLedgerConfig);
+        this.dLedgerServer.registerStateMachine(this.statemachine);
+        this.dLedgerServer.getdLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
+    }
+
+    @Override
+    public void startup() {
+        this.dLedgerServer.startup();
+    }
+
+    @Override
+    public void shutdown() {
+        this.dLedgerServer.shutdown();
+    }
+
+    @Override
+    public void startScheduling() {
+        if (!this.isScheduling) {
+            log.info("Start scheduling controller events");
+            this.isScheduling = true;
+            this.scheduler.start();
+        }
+    }
+
+    @Override
+    public void stopScheduling() {
+        if (this.isScheduling) {
+            log.info("Stop scheduling controller events");
+            this.isScheduling = false;
+            this.scheduler.shutdown(true);
+        }
+    }
+
+    @Override
+    public CompletableFuture<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+        AlterSyncStateSetRequestHeader request) {
+        if (!this.roleHandler.isLeaderState()) {
+            log.warn("Current controller {} is not leader, reject alterSyncStateSet request", this.dLedgerConfig.getSelfId());
+            return null;
+        }
+        return this.scheduler.appendEvent("alterSyncStateSet",
+            () -> this.replicasInfoManager.alterSyncStateSet(request), true);
+    }
+
+    @Override
+    public CompletableFuture<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request) {
+        if (!this.roleHandler.isLeaderState()) {
+            log.warn("Current controller {} is not leader, reject electMaster request", this.dLedgerConfig.getSelfId());
+            return null;
+        }
+        return this.scheduler.appendEvent("electMaster",
+            () -> this.replicasInfoManager.electMaster(request), true);
+    }
+
+    @Override
+    public CompletableFuture<RegisterBrokerResponseHeader> registerBroker(RegisterBrokerRequestHeader request) {
+        if (!this.roleHandler.isLeaderState()) {
+            log.warn("Current controller {} is not leader, reject registerBroker request", this.dLedgerConfig.getSelfId());
+            return null;
+        }
+        return this.scheduler.appendEvent("registerBroker",
+            () -> this.replicasInfoManager.registerBroker(request), true);
+    }
+
+    @Override
+    public CompletableFuture<GetReplicaInfoResponseHeader> getReplicaInfo(final GetReplicaInfoRequestHeader request) {
+        if (!this.roleHandler.isLeaderState()) {
+            log.warn("Current controller {} is not leader, reject getReplicaInfo request", this.dLedgerConfig.getSelfId());
+            return null;
+        }
+        return this.scheduler.appendEvent("getReplicaInfo",
+            () -> this.replicasInfoManager.getReplicaInfo(request), false);
+    }
+
+    @Override
+    public GetMetaDataResponseHeader getControllerMetadata() {
+        final MemberState state = getMemberState();
+        return new GetMetaDataResponseHeader(state.getLeaderId(), state.getLeaderAddr());
+    }
+
+    /**
+     * Event scheduler, schedule event handler from event queue
+     */
+    class EventScheduler extends ServiceThread {
+        private final BlockingQueue<EventHandler> eventQueue;
+
+        public EventScheduler() {
+            this.eventQueue = new LinkedBlockingQueue<>(1024);
+        }
+
+        @Override
+        public String getServiceName() {
+            return EventScheduler.class.getName();
+        }
+
+        @Override
+        public void run() {
+            log.info("Start event scheduler.");
+            while (!isStopped()) {
+                EventHandler handler;
+                try {
+                    handler = this.eventQueue.poll(5, TimeUnit.SECONDS);
+                } catch (final InterruptedException e) {
+                    continue;
+                }
+                try {
+                    if (handler != null) {
+                        handler.run();
+                    }
+                } catch (final Throwable e) {
+                    handler.handleException(e);
+                }
+            }
+
+        }
+
+        public <T> CompletableFuture<T> appendEvent(final String name, final Supplier<ControllerResult<T>> supplier,
+            boolean isWriteEvent) {
+            if (isStopped()) {
+                return null;
+            }
+            final EventHandler<T> event = new ControllerEventHandler<>(name, supplier, isWriteEvent);
+            int tryTimes = 0;
+            while (true) {
+                try {
+                    if (!this.eventQueue.offer(event, 5, TimeUnit.SECONDS)) {
+                        continue;
+                    }
+                    return event.future();
+                } catch (final InterruptedException e) {
+                    log.error("Error happen in EventScheduler when append event", e);
+                    tryTimes++;
+                    if (tryTimes > 3) {
+                        return null;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Event handler, get events from supplier, and append events to dledger
+     */
+    class ControllerEventHandler<T> implements EventHandler<T> {
+        private final String name;
+        private final Supplier<ControllerResult<T>> supplier;
+        private final CompletableFuture<T> future;
+        private final boolean isWriteEvent;
+
+        ControllerEventHandler(final String name, final Supplier<ControllerResult<T>> supplier,
+            final boolean isWriteEvent) {
+            this.name = name;
+            this.supplier = supplier;
+            this.future = new CompletableFuture<>();
+            this.isWriteEvent = isWriteEvent;
+        }
+
+        @Override
+        public void run() throws Throwable {
+            final ControllerResult<T> result = this.supplier.get();
+            log.info("Event queue run event {}, get the result {}", this.name, result);
+            boolean appendSuccess = true;
+            if (this.isWriteEvent) {
+                final List<EventMessage> events = result.getEvents();
+                final List<byte[]> eventBytes = new ArrayList<>(events.size());
+                for (final EventMessage event : events) {
+                    if (event != null) {
+                        final byte[] data = DledgerController.this.eventSerializer.serialize(event);
+                        if (data != null && data.length > 0) {
+                            eventBytes.add(data);
+                        }
+                    }
+                }
+                // Append events to dledger
+                if (!eventBytes.isEmpty()) {
+                    final BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+                    request.setBatchMsgs(eventBytes);
+                    appendSuccess = appendToDledgerAndWait(request);
+                }
+            } else {
+                // Now the dledger don't have the function of Read-Index or Lease-Read,
+                // So we still need to propose an empty request to dledger.
+                final AppendEntryRequest request = new AppendEntryRequest();
+                request.setBody(new byte[0]);
+                appendSuccess = appendToDledgerAndWait(request);
+            }

Review Comment:
   getReplicaInfo不建议每次走读日志,建议直接读内存,最终一致性。因为这个操作在broker端会比较频繁,如果broker很多的情况下,对controller的压力很大



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ErrorCodes;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final Long LEADER_ID = 0L;
+    private final boolean enableElectUncleanMaster;
+    private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+    public ReplicasInfoManager(final boolean enableElectUncleanMaster) {
+        this.enableElectUncleanMaster = enableElectUncleanMaster;
+        this.replicaInfoTable = new HashMap<>();
+        this.inSyncReplicasInfoTable = new HashMap<>();
+    }
+
+    /********************************    The following methods don't update statemachine, triggered by controller leader   ********************************/
+
+    public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+        final AlterSyncStateSetRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader());
+        final AlterSyncStateSetResponseHeader response = result.getResponse();
+
+        if (isContainsBroker(brokerName)) {
+            final Set<String> newSyncStateSet = request.getNewSyncStateSet();
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Check master
+            if (!replicasInfo.getMasterAddress().equals(request.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the current leader is:{}, not {}",
+                    replicasInfo.getMasterAddress(), request.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Check master epoch
+            if (request.getMasterEpoch() != replicasInfo.getMasterEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current master epoch is:{}, not {}",
+                    replicasInfo.getMasterEpoch(), request.getMasterEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_LEADER_EPOCH.getCode());
+                return result;
+            }
+
+            // Check syncStateSet epoch
+            if (request.getSyncStateSetEpoch() != replicasInfo.getSyncStateSetEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{}, not {}",
+                    replicasInfo.getSyncStateSetEpoch(), request.getSyncStateSetEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_SYNC_STATE_SET_EPOCH.getCode());
+                return result;
+            }
+
+            // Check newSyncStateSet correctness
+            for (String replicas : newSyncStateSet) {
+                if (!brokerIdTable.containsKey(replicas)) {
+                    log.info("Rejecting alter syncStateSet request because the replicas {} don't exist", replicas);
+                    response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                    return result;
+                }
+                // todo: check whether the replicas is active
+            }
+            if (!newSyncStateSet.contains(replicasInfo.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {}", replicasInfo.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Generate event
+            response.setNewSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch() + 1);
+            response.setNewSyncStateSet(newSyncStateSet);
+            final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+            result.addEvent(event);
+
+            return result;
+        }
+        response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
+        final ElectMasterResponseHeader response = result.getResponse();
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final Set<String> syncStateSet = replicasInfo.getSyncStateSet();
+            if (syncStateSet.size() > 1) {
+                for (String replicas : syncStateSet) {
+                    if (replicas.equals(replicasInfo.getMasterAddress())) {
+                        continue;
+                    }
+                    // todo: check whether the replicas is active
+                    response.setNewMasterAddress(replicas);
+                    response.setMasterEpoch(replicasInfo.getMasterEpoch() + 1);
+
+                    final ElectMasterEvent event = new ElectMasterEvent(brokerName, replicas);
+                    result.addEvent(event);
+                    return result;
+                }
+            }
+            // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
+            // that the master was shutdown and no new master was elected.
+            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            result.addEvent(event);
+            response.setErrorCode(ErrorCodes.MASTER_NOT_AVAILABLE.getCode());
+            return result;
+        }
+        result.getResponse().setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<RegisterBrokerResponseHeader> registerBroker(final RegisterBrokerRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final String brokerAddress = request.getBrokerAddress();
+        final ControllerResult<RegisterBrokerResponseHeader> result = new ControllerResult<>(new RegisterBrokerResponseHeader());
+        final RegisterBrokerResponseHeader response = result.getResponse();
+        boolean canBeElectedAsMaster;
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Get brokerId.
+            long brokerId;
+            if (!brokerIdTable.containsKey(brokerAddress)) {
+                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+                brokerId = brokerInfo.newBrokerId();
+                final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getBrokerName(),
+                    brokerAddress, brokerId);
+                result.addEvent(applyIdEvent);

Review Comment:
   好的,那我们就得去想一下如何recover,而在recover完成前是无法提供服务的,因为内存中的数据是不可靠的



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java:
##########
@@ -238,6 +270,11 @@ private void registerProcessor() {
             this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);
 
             this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
+
+            if (namesrvConfig.isStartupController()) {
+                final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this.controller);
+                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_REQUEST, controllerRequestProcessor, this.controllerRequestExecutor);

Review Comment:
   这里是不是忘了注册其他的requestCode



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java:
##########
@@ -82,24 +85,36 @@ public class NamesrvController {
 
     private ExecutorService defaultExecutor;
     private ExecutorService clientRequestExecutor;
+    private ExecutorService controllerRequestExecutor;
 
     private BlockingQueue<Runnable> defaultThreadPoolQueue;
     private BlockingQueue<Runnable> clientRequestThreadPoolQueue;
+    private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
 
     private Configuration configuration;
     private FileWatchService fileWatchService;
 
+    private Controller controller;
+
     public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
         this(namesrvConfig, nettyServerConfig, new NettyClientConfig());
     }
 
-    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
+    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig,
+        NettyClientConfig nettyClientConfig) {
         this.namesrvConfig = namesrvConfig;
         this.nettyServerConfig = nettyServerConfig;
         this.nettyClientConfig = nettyClientConfig;
         this.kvConfigManager = new KVConfigManager(this);
         this.brokerHousekeepingService = new BrokerHousekeepingService(this);
-        this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
+        if (namesrvConfig.isStartupController()) {
+            final DLedgerConfig config = new DLedgerConfig();
+            config.setGroup(namesrvConfig.getControllerDLegerGroup());
+            config.setPeers(namesrvConfig.getControllerDLegerPeers());
+            config.setSelfId(namesrvConfig.getControllerDLegerSelfId());
+            this.controller = new DledgerController(config, namesrvConfig.isEnableElectUncleanMaster());

Review Comment:
   这样的构造方式可能会导致namesrvConfig在运行时通过admin工具变更enableElectUncleanMaster配置,但不起作用。或者应该传入controllerConfig(namesrvConfig)



##########
common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java:
##########
@@ -73,6 +73,27 @@ public class NamesrvConfig {
 
     private volatile boolean enableAllTopicList = false;
 
+    /**
+     * Dledger controller config
+     */
+    private boolean isStartupController = false;
+    /**
+     * Indicates the nums of thread to handle broker or operation requests, like REGISTER_BROKER.
+     */
+    private int controllerThreadPoolNums = 16;
+    /**
+     * Indicates the capacity of queue to hold client requests.
+     */
+    private int controllerRequestThreadPoolQueueCapacity = 50000;
+    private String controllerDLegerGroup;
+    private String controllerDLegerPeers;
+    private String controllerDLegerSelfId;
+    private String controllerStorePath = System.getProperty("user.home") + File.separator + "DledgerController";
+    /**
+     * Whether the controller can elect a master which is not in the syncStateSet.
+     */
+    private boolean enableElectUncleanMaster = false;

Review Comment:
   建议单独搞个Controller的配置,在启动的时候(createNamesrvController)MixAll.properties2Object(properties, controllerConfig);即可



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * In sync replicas info, manages the master and syncStateSet of a broker.
+ */
+public class InSyncReplicasInfo {
+    private final String clusterName;
+    private final String brokerName;
+
+    private Set<String/*Address*/> syncStateSet;
+    private int syncStateSetEpoch;
+
+    private String masterAddress;
+    private int masterEpoch;
+    // Because when a Broker becomes a master, its id needs to be assigned a value of 0.
+    // We need to record it's originId so that when it becomes a follower again, we can find its original id.
+    private long masterOriginId;

Review Comment:
   或许我们不需要记录masterOriginId,只要保持一个规则,master的id为0,其他的id为原编号,而brokerIdTable也只需要记录分配的编号即可,不需要一直变换



##########
common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ErrorCodes.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+/**
+ * @author hzh
+ * @email 642256541@qq.com
+ * @date 2022/4/16 20:01
+ */
+public enum ErrorCodes {
+
+    NONE((short) 0, "No error"),
+    FENCED_LEADER_EPOCH((short) 1, "The leader epoch in the request is older than the current epoch"),
+    FENCED_SYNC_STATE_SET_EPOCH((short) 2, "The syncStateSet epoch in the request is older than the current epoch"),
+    INVALID_REQUEST((short) 3, "The request is invalid"),
+    MASTER_NOT_AVAILABLE((short) 4, "There is no available master for this broker.");

Review Comment:
   建议和ResponseCode做一些融合



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * In sync replicas info, manages the master and syncStateSet of a broker.
+ */
+public class InSyncReplicasInfo {
+    private final String clusterName;
+    private final String brokerName;
+
+    private Set<String/*Address*/> syncStateSet;
+    private int syncStateSetEpoch;
+
+    private String masterAddress;
+    private int masterEpoch;
+    // Because when a Broker becomes a master, its id needs to be assigned a value of 0.
+    // We need to record it's originId so that when it becomes a follower again, we can find its original id.
+    private long masterOriginId;
+
+    public InSyncReplicasInfo(String clusterName, String brokerName, String masterAddress) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.masterAddress = masterAddress;
+        this.masterEpoch = 1;
+        // The first master is the first online broker
+        this.masterOriginId = 1;
+        this.syncStateSet = new HashSet<>();
+        this.syncStateSet.add(masterAddress);
+        this.syncStateSetEpoch = 1;
+    }
+
+    public void updateMasterInfo(String masterAddress, long masterOriginId) {
+        this.masterAddress = masterAddress;
+        this.masterOriginId = masterOriginId;
+        this.masterEpoch++;
+    }
+
+    public void updateSyncStateSetInfo(Set<String> newSyncStateSet) {
+        this.syncStateSet = newSyncStateSet;
+        this.syncStateSetEpoch++;
+    }
+
+    public boolean isMasterAlive() {

Review Comment:
   或者叫isMasterExist更好



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ErrorCodes;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final Long LEADER_ID = 0L;

Review Comment:
   利用Mix.MASTER_ID即可



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ErrorCodes;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final Long LEADER_ID = 0L;
+    private final boolean enableElectUncleanMaster;
+    private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+    public ReplicasInfoManager(final boolean enableElectUncleanMaster) {
+        this.enableElectUncleanMaster = enableElectUncleanMaster;
+        this.replicaInfoTable = new HashMap<>();
+        this.inSyncReplicasInfoTable = new HashMap<>();
+    }
+
+    /********************************    The following methods don't update statemachine, triggered by controller leader   ********************************/
+
+    public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+        final AlterSyncStateSetRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader());
+        final AlterSyncStateSetResponseHeader response = result.getResponse();
+
+        if (isContainsBroker(brokerName)) {
+            final Set<String> newSyncStateSet = request.getNewSyncStateSet();
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Check master
+            if (!replicasInfo.getMasterAddress().equals(request.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the current leader is:{}, not {}",
+                    replicasInfo.getMasterAddress(), request.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Check master epoch
+            if (request.getMasterEpoch() != replicasInfo.getMasterEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current master epoch is:{}, not {}",
+                    replicasInfo.getMasterEpoch(), request.getMasterEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_LEADER_EPOCH.getCode());
+                return result;
+            }
+
+            // Check syncStateSet epoch
+            if (request.getSyncStateSetEpoch() != replicasInfo.getSyncStateSetEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{}, not {}",
+                    replicasInfo.getSyncStateSetEpoch(), request.getSyncStateSetEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_SYNC_STATE_SET_EPOCH.getCode());
+                return result;
+            }
+
+            // Check newSyncStateSet correctness
+            for (String replicas : newSyncStateSet) {
+                if (!brokerIdTable.containsKey(replicas)) {
+                    log.info("Rejecting alter syncStateSet request because the replicas {} don't exist", replicas);
+                    response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                    return result;
+                }
+                // todo: check whether the replicas is active
+            }
+            if (!newSyncStateSet.contains(replicasInfo.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {}", replicasInfo.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Generate event
+            response.setNewSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch() + 1);
+            response.setNewSyncStateSet(newSyncStateSet);
+            final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+            result.addEvent(event);
+
+            return result;
+        }
+        response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
+        final ElectMasterResponseHeader response = result.getResponse();
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final Set<String> syncStateSet = replicasInfo.getSyncStateSet();
+            if (syncStateSet.size() > 1) {
+                for (String replicas : syncStateSet) {
+                    if (replicas.equals(replicasInfo.getMasterAddress())) {
+                        continue;
+                    }
+                    // todo: check whether the replicas is active
+                    response.setNewMasterAddress(replicas);
+                    response.setMasterEpoch(replicasInfo.getMasterEpoch() + 1);
+
+                    final ElectMasterEvent event = new ElectMasterEvent(brokerName, replicas);
+                    result.addEvent(event);
+                    return result;
+                }
+            }
+            // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
+            // that the master was shutdown and no new master was elected.
+            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            result.addEvent(event);
+            response.setErrorCode(ErrorCodes.MASTER_NOT_AVAILABLE.getCode());
+            return result;
+        }
+        result.getResponse().setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<RegisterBrokerResponseHeader> registerBroker(final RegisterBrokerRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final String brokerAddress = request.getBrokerAddress();
+        final ControllerResult<RegisterBrokerResponseHeader> result = new ControllerResult<>(new RegisterBrokerResponseHeader());
+        final RegisterBrokerResponseHeader response = result.getResponse();
+        boolean canBeElectedAsMaster;
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Get brokerId.
+            long brokerId;
+            if (!brokerIdTable.containsKey(brokerAddress)) {
+                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+                brokerId = brokerInfo.newBrokerId();
+                final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getBrokerName(),
+                    brokerAddress, brokerId);
+                result.addEvent(applyIdEvent);
+            } else {
+                brokerId = brokerIdTable.get(brokerAddress);
+            }
+            response.setBrokerId(brokerId);
+            response.setMasterEpoch(replicasInfo.getMasterEpoch());
+
+            if (replicasInfo.isMasterAlive()) {
+                // If the master is alive, just return master info.
+                response.setMasterAddress(replicasInfo.getMasterAddress());
+                return result;
+            } else {
+                // If the master is not alive, we should elect a new master:
+                // Case1: This replicas was in sync state set list
+                // Case2: The option {EnableElectUncleanMaster} is true
+                canBeElectedAsMaster = replicasInfo.getSyncStateSet().contains(brokerAddress) || this.enableElectUncleanMaster;
+            }
+        } else {
+            // If the broker's metadata does not exist in the state machine, the replicas can be elected as master directly.
+            canBeElectedAsMaster = true;
+        }
+
+        if (canBeElectedAsMaster) {
+            int masterEpoch = this.inSyncReplicasInfoTable.containsKey(brokerName) ?
+                this.inSyncReplicasInfoTable.get(brokerName).getMasterEpoch() + 1 : 1;
+            response.setMasterAddress(request.getBrokerAddress());
+            response.setMasterEpoch(masterEpoch);
+            response.setBrokerId(0);
+
+            final ElectMasterEvent event = new ElectMasterEvent(true, brokerName, brokerAddress, request.getClusterName());
+            result.addEvent(event);
+            return result;
+        }

Review Comment:
   这里似乎brokerName不存在,就不会为对应的broker去申请brokerId



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ErrorCodes;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final Long LEADER_ID = 0L;
+    private final boolean enableElectUncleanMaster;
+    private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+    public ReplicasInfoManager(final boolean enableElectUncleanMaster) {
+        this.enableElectUncleanMaster = enableElectUncleanMaster;
+        this.replicaInfoTable = new HashMap<>();
+        this.inSyncReplicasInfoTable = new HashMap<>();
+    }
+
+    /********************************    The following methods don't update statemachine, triggered by controller leader   ********************************/
+
+    public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+        final AlterSyncStateSetRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader());
+        final AlterSyncStateSetResponseHeader response = result.getResponse();
+
+        if (isContainsBroker(brokerName)) {
+            final Set<String> newSyncStateSet = request.getNewSyncStateSet();
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Check master
+            if (!replicasInfo.getMasterAddress().equals(request.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the current leader is:{}, not {}",
+                    replicasInfo.getMasterAddress(), request.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Check master epoch
+            if (request.getMasterEpoch() != replicasInfo.getMasterEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current master epoch is:{}, not {}",
+                    replicasInfo.getMasterEpoch(), request.getMasterEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_LEADER_EPOCH.getCode());
+                return result;
+            }
+
+            // Check syncStateSet epoch
+            if (request.getSyncStateSetEpoch() != replicasInfo.getSyncStateSetEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{}, not {}",
+                    replicasInfo.getSyncStateSetEpoch(), request.getSyncStateSetEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_SYNC_STATE_SET_EPOCH.getCode());
+                return result;
+            }
+
+            // Check newSyncStateSet correctness
+            for (String replicas : newSyncStateSet) {
+                if (!brokerIdTable.containsKey(replicas)) {
+                    log.info("Rejecting alter syncStateSet request because the replicas {} don't exist", replicas);
+                    response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                    return result;
+                }
+                // todo: check whether the replicas is active
+            }
+            if (!newSyncStateSet.contains(replicasInfo.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {}", replicasInfo.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Generate event
+            response.setNewSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch() + 1);
+            response.setNewSyncStateSet(newSyncStateSet);
+            final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+            result.addEvent(event);
+
+            return result;
+        }
+        response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
+        final ElectMasterResponseHeader response = result.getResponse();
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final Set<String> syncStateSet = replicasInfo.getSyncStateSet();
+            if (syncStateSet.size() > 1) {
+                for (String replicas : syncStateSet) {
+                    if (replicas.equals(replicasInfo.getMasterAddress())) {
+                        continue;
+                    }
+                    // todo: check whether the replicas is active
+                    response.setNewMasterAddress(replicas);
+                    response.setMasterEpoch(replicasInfo.getMasterEpoch() + 1);
+
+                    final ElectMasterEvent event = new ElectMasterEvent(brokerName, replicas);
+                    result.addEvent(event);
+                    return result;
+                }
+            }
+            // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
+            // that the master was shutdown and no new master was elected.
+            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            result.addEvent(event);
+            response.setErrorCode(ErrorCodes.MASTER_NOT_AVAILABLE.getCode());
+            return result;
+        }
+        result.getResponse().setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<RegisterBrokerResponseHeader> registerBroker(final RegisterBrokerRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final String brokerAddress = request.getBrokerAddress();
+        final ControllerResult<RegisterBrokerResponseHeader> result = new ControllerResult<>(new RegisterBrokerResponseHeader());
+        final RegisterBrokerResponseHeader response = result.getResponse();
+        boolean canBeElectedAsMaster;
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Get brokerId.
+            long brokerId;
+            if (!brokerIdTable.containsKey(brokerAddress)) {
+                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+                brokerId = brokerInfo.newBrokerId();
+                final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getBrokerName(),
+                    brokerAddress, brokerId);
+                result.addEvent(applyIdEvent);
+            } else {
+                brokerId = brokerIdTable.get(brokerAddress);
+            }
+            response.setBrokerId(brokerId);
+            response.setMasterEpoch(replicasInfo.getMasterEpoch());
+
+            if (replicasInfo.isMasterAlive()) {
+                // If the master is alive, just return master info.
+                response.setMasterAddress(replicasInfo.getMasterAddress());
+                return result;
+            } else {
+                // If the master is not alive, we should elect a new master:
+                // Case1: This replicas was in sync state set list
+                // Case2: The option {EnableElectUncleanMaster} is true
+                canBeElectedAsMaster = replicasInfo.getSyncStateSet().contains(brokerAddress) || this.enableElectUncleanMaster;

Review Comment:
   enableElectUncleanMaster应该是在electMaster时发现SycnStatSet中没有可选broker时,从所有存活的broker中挑选



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ErrorCodes;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final Long LEADER_ID = 0L;
+    private final boolean enableElectUncleanMaster;
+    private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+    public ReplicasInfoManager(final boolean enableElectUncleanMaster) {
+        this.enableElectUncleanMaster = enableElectUncleanMaster;
+        this.replicaInfoTable = new HashMap<>();
+        this.inSyncReplicasInfoTable = new HashMap<>();
+    }
+
+    /********************************    The following methods don't update statemachine, triggered by controller leader   ********************************/
+
+    public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+        final AlterSyncStateSetRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader());
+        final AlterSyncStateSetResponseHeader response = result.getResponse();
+
+        if (isContainsBroker(brokerName)) {
+            final Set<String> newSyncStateSet = request.getNewSyncStateSet();
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Check master
+            if (!replicasInfo.getMasterAddress().equals(request.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the current leader is:{}, not {}",
+                    replicasInfo.getMasterAddress(), request.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Check master epoch
+            if (request.getMasterEpoch() != replicasInfo.getMasterEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current master epoch is:{}, not {}",
+                    replicasInfo.getMasterEpoch(), request.getMasterEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_LEADER_EPOCH.getCode());
+                return result;
+            }
+
+            // Check syncStateSet epoch
+            if (request.getSyncStateSetEpoch() != replicasInfo.getSyncStateSetEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{}, not {}",
+                    replicasInfo.getSyncStateSetEpoch(), request.getSyncStateSetEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_SYNC_STATE_SET_EPOCH.getCode());
+                return result;
+            }
+
+            // Check newSyncStateSet correctness
+            for (String replicas : newSyncStateSet) {
+                if (!brokerIdTable.containsKey(replicas)) {
+                    log.info("Rejecting alter syncStateSet request because the replicas {} don't exist", replicas);
+                    response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                    return result;
+                }
+                // todo: check whether the replicas is active
+            }
+            if (!newSyncStateSet.contains(replicasInfo.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {}", replicasInfo.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Generate event
+            response.setNewSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch() + 1);
+            response.setNewSyncStateSet(newSyncStateSet);
+            final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+            result.addEvent(event);
+
+            return result;
+        }
+        response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
+        final ElectMasterResponseHeader response = result.getResponse();
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final Set<String> syncStateSet = replicasInfo.getSyncStateSet();
+            if (syncStateSet.size() > 1) {
+                for (String replicas : syncStateSet) {
+                    if (replicas.equals(replicasInfo.getMasterAddress())) {
+                        continue;
+                    }
+                    // todo: check whether the replicas is active
+                    response.setNewMasterAddress(replicas);
+                    response.setMasterEpoch(replicasInfo.getMasterEpoch() + 1);
+
+                    final ElectMasterEvent event = new ElectMasterEvent(brokerName, replicas);
+                    result.addEvent(event);
+                    return result;
+                }
+            }
+            // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
+            // that the master was shutdown and no new master was elected.
+            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            result.addEvent(event);
+            response.setErrorCode(ErrorCodes.MASTER_NOT_AVAILABLE.getCode());
+            return result;
+        }
+        result.getResponse().setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<RegisterBrokerResponseHeader> registerBroker(final RegisterBrokerRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final String brokerAddress = request.getBrokerAddress();
+        final ControllerResult<RegisterBrokerResponseHeader> result = new ControllerResult<>(new RegisterBrokerResponseHeader());
+        final RegisterBrokerResponseHeader response = result.getResponse();
+        boolean canBeElectedAsMaster;
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Get brokerId.
+            long brokerId;
+            if (!brokerIdTable.containsKey(brokerAddress)) {
+                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+                brokerId = brokerInfo.newBrokerId();
+                final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getBrokerName(),
+                    brokerAddress, brokerId);
+                result.addEvent(applyIdEvent);
+            } else {
+                brokerId = brokerIdTable.get(brokerAddress);
+            }
+            response.setBrokerId(brokerId);
+            response.setMasterEpoch(replicasInfo.getMasterEpoch());
+
+            if (replicasInfo.isMasterAlive()) {
+                // If the master is alive, just return master info.
+                response.setMasterAddress(replicasInfo.getMasterAddress());
+                return result;
+            } else {
+                // If the master is not alive, we should elect a new master:
+                // Case1: This replicas was in sync state set list
+                // Case2: The option {EnableElectUncleanMaster} is true
+                canBeElectedAsMaster = replicasInfo.getSyncStateSet().contains(brokerAddress) || this.enableElectUncleanMaster;
+            }
+        } else {
+            // If the broker's metadata does not exist in the state machine, the replicas can be elected as master directly.
+            canBeElectedAsMaster = true;
+        }
+
+        if (canBeElectedAsMaster) {
+            int masterEpoch = this.inSyncReplicasInfoTable.containsKey(brokerName) ?
+                this.inSyncReplicasInfoTable.get(brokerName).getMasterEpoch() + 1 : 1;
+            response.setMasterAddress(request.getBrokerAddress());
+            response.setMasterEpoch(masterEpoch);
+            response.setBrokerId(0);
+
+            final ElectMasterEvent event = new ElectMasterEvent(true, brokerName, brokerAddress, request.getClusterName());
+            result.addEvent(event);
+            return result;
+        }
+
+        response.setMasterAddress("");
+        response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<GetReplicaInfoResponseHeader> getReplicaInfo(final GetReplicaInfoRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<GetReplicaInfoResponseHeader> result = new ControllerResult<>(new GetReplicaInfoResponseHeader());
+        final GetReplicaInfoResponseHeader response = result.getResponse();
+        if (isContainsBroker(brokerName)) {
+            // If exist broker metadata, just return metadata
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            response.setMasterAddress(replicasInfo.getMasterAddress());
+            response.setMasterEpoch(replicasInfo.getMasterEpoch());
+            response.setSyncStateSet(replicasInfo.getSyncStateSet());
+            response.setSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch());
+            return result;
+        }
+        result.getResponse().setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());

Review Comment:
   除了都返回INVALID_REQUEST,可能需要一些更加详细的错误码



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org