You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/22 11:40:10 UTC

[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4195: [Summer of Code] Dledger controller

hzh0425 commented on code in PR #4195:
URL: https://github.com/apache/rocketmq/pull/4195#discussion_r856146275


##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ErrorCodes;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final Long LEADER_ID = 0L;
+    private final boolean enableElectUncleanMaster;
+    private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+    public ReplicasInfoManager(final boolean enableElectUncleanMaster) {
+        this.enableElectUncleanMaster = enableElectUncleanMaster;
+        this.replicaInfoTable = new HashMap<>();
+        this.inSyncReplicasInfoTable = new HashMap<>();
+    }
+
+    /********************************    The following methods don't update statemachine, triggered by controller leader   ********************************/
+
+    public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+        final AlterSyncStateSetRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader());
+        final AlterSyncStateSetResponseHeader response = result.getResponse();
+
+        if (isContainsBroker(brokerName)) {
+            final Set<String> newSyncStateSet = request.getNewSyncStateSet();
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Check master
+            if (!replicasInfo.getMasterAddress().equals(request.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the current leader is:{}, not {}",
+                    replicasInfo.getMasterAddress(), request.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Check master epoch
+            if (request.getMasterEpoch() != replicasInfo.getMasterEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current master epoch is:{}, not {}",
+                    replicasInfo.getMasterEpoch(), request.getMasterEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_LEADER_EPOCH.getCode());
+                return result;
+            }
+
+            // Check syncStateSet epoch
+            if (request.getSyncStateSetEpoch() != replicasInfo.getSyncStateSetEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{}, not {}",
+                    replicasInfo.getSyncStateSetEpoch(), request.getSyncStateSetEpoch());
+                response.setErrorCode(ErrorCodes.FENCED_SYNC_STATE_SET_EPOCH.getCode());
+                return result;
+            }
+
+            // Check newSyncStateSet correctness
+            for (String replicas : newSyncStateSet) {
+                if (!brokerIdTable.containsKey(replicas)) {
+                    log.info("Rejecting alter syncStateSet request because the replicas {} don't exist", replicas);
+                    response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                    return result;
+                }
+                // todo: check whether the replicas is active
+            }
+            if (!newSyncStateSet.contains(replicasInfo.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {}", replicasInfo.getMasterAddress());
+                response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+                return result;
+            }
+
+            // Generate event
+            response.setNewSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch() + 1);
+            response.setNewSyncStateSet(newSyncStateSet);
+            final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+            result.addEvent(event);
+
+            return result;
+        }
+        response.setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
+        final ElectMasterResponseHeader response = result.getResponse();
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final Set<String> syncStateSet = replicasInfo.getSyncStateSet();
+            if (syncStateSet.size() > 1) {
+                for (String replicas : syncStateSet) {
+                    if (replicas.equals(replicasInfo.getMasterAddress())) {
+                        continue;
+                    }
+                    // todo: check whether the replicas is active
+                    response.setNewMasterAddress(replicas);
+                    response.setMasterEpoch(replicasInfo.getMasterEpoch() + 1);
+
+                    final ElectMasterEvent event = new ElectMasterEvent(brokerName, replicas);
+                    result.addEvent(event);
+                    return result;
+                }
+            }
+            // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
+            // that the master was shutdown and no new master was elected.
+            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            result.addEvent(event);
+            response.setErrorCode(ErrorCodes.MASTER_NOT_AVAILABLE.getCode());
+            return result;
+        }
+        result.getResponse().setErrorCode(ErrorCodes.INVALID_REQUEST.getCode());
+        return result;
+    }
+
+    public ControllerResult<RegisterBrokerResponseHeader> registerBroker(final RegisterBrokerRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final String brokerAddress = request.getBrokerAddress();
+        final ControllerResult<RegisterBrokerResponseHeader> result = new ControllerResult<>(new RegisterBrokerResponseHeader());
+        final RegisterBrokerResponseHeader response = result.getResponse();
+        boolean canBeElectedAsMaster;
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Get brokerId.
+            long brokerId;
+            if (!brokerIdTable.containsKey(brokerAddress)) {
+                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+                brokerId = brokerInfo.newBrokerId();
+                final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getBrokerName(),
+                    brokerAddress, brokerId);
+                result.addEvent(applyIdEvent);

Review Comment:
   Actually not, because eventScheduler is scheduled in fifo order.
   The next eventHandler will be dispatched only after the events generated by the previous eventHandler are appended to the dledger and applied to the state machine.
   So it doesn't happen as you said.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org