You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/22 10:20:39 UTC

[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4195: Dledger controller

RongtongJin commented on code in PR #4195:
URL: https://github.com/apache/rocketmq/pull/4195#discussion_r855814522


##########
common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ErrorCodes.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+/**
+ * @author hzh
+ * @email 642256541@qq.com
+ * @date 2022/4/16 20:01
+ */

Review Comment:
   Do not leave author information



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
+
+/**
+ * Processor for controller request
+ */
+public class ControllerRequestProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final int WAIT_TIMEOUT_OUT = 10;

Review Comment:
   Is the timeout here too long? Maybe 5 seconds better



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ErrorCodes;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final Long LEADER_ID = 0L;
+    private final boolean enableElectUncleanMaster;
+    private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+    public ReplicasInfoManager(final boolean enableElectUncleanMaster) {
+        this.enableElectUncleanMaster = enableElectUncleanMaster;
+        this.replicaInfoTable = new HashMap<>();
+        this.inSyncReplicasInfoTable = new HashMap<>();
+    }
+
+    /********************************    The following methods don't update statemachine, triggered by controller leader   ********************************/

Review Comment:
   It is would be better to follow the rocketmq comment format



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ErrorCodes;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final Long LEADER_ID = 0L;
+    private final boolean enableElectUncleanMaster;
+    private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+    public ReplicasInfoManager(final boolean enableElectUncleanMaster) {
+        this.enableElectUncleanMaster = enableElectUncleanMaster;
+        this.replicaInfoTable = new HashMap<>();
+        this.inSyncReplicasInfoTable = new HashMap<>();
+    }
+
+    /********************************    The following methods don't update statemachine, triggered by controller leader   ********************************/
+
+    public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+        final AlterSyncStateSetRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader());
+        final AlterSyncStateSetResponseHeader response = result.getResponse();
+
+        if (isContainsBroker(brokerName)) {
+            final Set<String> newSyncStateSet = request.getNewSyncStateSet();
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Check master
+            if (!replicasInfo.getMasterAddress().equals(request.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the current leader is:{}, not {}",
+                    replicasInfo.getMasterAddress(), request.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Check master epoch
+            if (request.getMasterEpoch() != replicasInfo.getMasterEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current master epoch is:{}, not {}",
+                    replicasInfo.getMasterEpoch(), request.getMasterEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_LEADER_EPOCH.getCode());
+                return result;
+            }
+
+            // Check syncStateSet epoch
+            if (request.getSyncStateSetEpoch() != replicasInfo.getSyncStateSetEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{}, not {}",
+                    replicasInfo.getSyncStateSetEpoch(), request.getSyncStateSetEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_SYNC_STATE_SET_EPOCH.getCode());
+                return result;
+            }
+
+            // Check newSyncStateSet correctness
+            for (String replicas : newSyncStateSet) {
+                if (!brokerIdTable.containsKey(replicas)) {
+                    log.info("Rejecting alter syncStateSet request because the replicas {} don't exist", replicas);
+                    response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                    return result;
+                }
+                // todo: check whether the replicas is active
+            }
+            if (!newSyncStateSet.contains(replicasInfo.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {}", replicasInfo.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Generate event
+            response.setNewSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch() + 1);
+            response.setNewSyncStateSet(newSyncStateSet);
+            final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+            result.addEvent(event);
+
+            return result;
+        }
+        response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
+        final ElectMasterResponseHeader response = result.getResponse();
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final Set<String> syncStateSet = replicasInfo.getSyncStateSet();
+            if (syncStateSet.size() > 1) {
+                for (String replicas : syncStateSet) {
+                    if (replicas.equals(replicasInfo.getMasterAddress())) {
+                        continue;
+                    }
+                    // todo: check whether the replicas is active
+                    response.setNewMasterAddress(replicas);
+                    response.setMasterEpoch(replicasInfo.getMasterEpoch() + 1);
+
+                    final ElectMasterEvent event = new ElectMasterEvent(brokerName, replicas);
+                    result.addEvent(event);
+                    return result;
+                }
+            }
+            // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
+            // that the master was shutdown and no new master was elected.
+            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            result.addEvent(event);
+            response.setErrorCode(ErrorCodes.MASTER_NOT_AVAILABLE.getCode());
+            return result;
+        }
+        result.getResponse().setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<RegisterBrokerResponseHeader> registerBroker(final RegisterBrokerRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final String brokerAddress = request.getBrokerAddress();
+        final ControllerResult<RegisterBrokerResponseHeader> result = new ControllerResult<>(new RegisterBrokerResponseHeader());
+        final RegisterBrokerResponseHeader response = result.getResponse();
+        boolean canBeElectedAsMaster;
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Get brokerId.
+            long brokerId;
+            if (!brokerIdTable.containsKey(brokerAddress)) {
+                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+                brokerId = brokerInfo.newBrokerId();
+                final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getBrokerName(),
+                    brokerAddress, brokerId);
+                result.addEvent(applyIdEvent);

Review Comment:
   Will generating events based on the values in memory cause duplicate brokerId or epoch?



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
+
+/**
+ * Processor for controller request
+ */
+public class ControllerRequestProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final int WAIT_TIMEOUT_OUT = 10;
+    private final Controller controller;
+
+
+    public ControllerRequestProcessor(final Controller controller) {
+        this.controller = controller;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        if (ctx != null) {
+            log.debug("Receive request, {} {} {}",
+                request.getCode(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                request);
+        }
+        switch(request.getCode()) {
+            case CONTROLLER_ALTER_SYNC_STATE_SET : {
+                final AlterSyncStateSetRequestHeader controllerRequest = request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+                final CompletableFuture<AlterSyncStateSetResponseHeader> future = this.controller.alterSyncStateSet(controllerRequest);
+                if (future != null) {
+                    final AlterSyncStateSetResponseHeader resp = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+                }
+                break;
+            }
+            case CONTROLLER_ELECT_MASTER: {
+                final ElectMasterRequestHeader controllerRequest = request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+                final CompletableFuture<ElectMasterResponseHeader> future = this.controller.electMaster(controllerRequest);
+                if (future != null) {
+                    final ElectMasterResponseHeader resp = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+                }
+                break;
+            }
+            case CONTROLLER_REGISTER_BROKER: {
+                final RegisterBrokerRequestHeader controllerRequest = request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+                final CompletableFuture<RegisterBrokerResponseHeader> future = this.controller.registerBroker(controllerRequest);
+                if (future != null) {
+                    final RegisterBrokerResponseHeader resp = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+                }
+                break;
+            }
+            case CONTROLLER_GET_REPLICA_INFO: {
+                final GetReplicaInfoRequestHeader controllerRequest = request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+                final CompletableFuture<GetReplicaInfoResponseHeader> future = this.controller.getReplicaInfo(controllerRequest);
+                if (future != null) {
+                    final GetReplicaInfoResponseHeader resp = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+                }
+                break;
+            }
+            case CONTROLLER_GET_METADATA_INFO: {
+                final GetMetaDataResponseHeader resp = this.controller.getControllerMetadata();
+                return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+            }
+            default: {
+                return RemotingCommand.createResponseCommandWithHeader(ResponseCode.QUERY_NOT_FOUND, null);

Review Comment:
   QUERY_NOT_FOUND is not suitable here. It is usually used to not find consumer offset. IMO, it would be better to return RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED here.
   
   



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
+
+/**
+ * Processor for controller request
+ */
+public class ControllerRequestProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final int WAIT_TIMEOUT_OUT = 10;
+    private final Controller controller;
+
+
+    public ControllerRequestProcessor(final Controller controller) {
+        this.controller = controller;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        if (ctx != null) {
+            log.debug("Receive request, {} {} {}",
+                request.getCode(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                request);
+        }
+        switch(request.getCode()) {
+            case CONTROLLER_ALTER_SYNC_STATE_SET : {
+                final AlterSyncStateSetRequestHeader controllerRequest = request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+                final CompletableFuture<AlterSyncStateSetResponseHeader> future = this.controller.alterSyncStateSet(controllerRequest);
+                if (future != null) {
+                    final AlterSyncStateSetResponseHeader resp = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+                }
+                break;
+            }
+            case CONTROLLER_ELECT_MASTER: {
+                final ElectMasterRequestHeader controllerRequest = request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+                final CompletableFuture<ElectMasterResponseHeader> future = this.controller.electMaster(controllerRequest);
+                if (future != null) {
+                    final ElectMasterResponseHeader resp = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+                }
+                break;
+            }
+            case CONTROLLER_REGISTER_BROKER: {
+                final RegisterBrokerRequestHeader controllerRequest = request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+                final CompletableFuture<RegisterBrokerResponseHeader> future = this.controller.registerBroker(controllerRequest);
+                if (future != null) {
+                    final RegisterBrokerResponseHeader resp = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+                }
+                break;
+            }
+            case CONTROLLER_GET_REPLICA_INFO: {
+                final GetReplicaInfoRequestHeader controllerRequest = request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+                final CompletableFuture<GetReplicaInfoResponseHeader> future = this.controller.getReplicaInfo(controllerRequest);
+                if (future != null) {
+                    final GetReplicaInfoResponseHeader resp = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+                }
+                break;
+            }
+            case CONTROLLER_GET_METADATA_INFO: {
+                final GetMetaDataResponseHeader resp = this.controller.getControllerMetadata();
+                return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, resp);
+            }
+            default: {
+                return RemotingCommand.createResponseCommandWithHeader(ResponseCode.QUERY_NOT_FOUND, null);
+            }
+        }
+        return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SYSTEM_ERROR, null);

Review Comment:
   Here you can refer to DefaultRequestProcessor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org