You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/07/20 03:26:46 UTC

[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4484: [RIP-44] Support DLedger Controller

RongtongJin commented on code in PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#discussion_r925132475


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.controller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including: 0.regularly syncing controller metadata, change controller leader address,
+ * both master and slave will start this timed task. 1.regularly syncing metadata from controllers, and changing broker
+ * roles and master if needed, both master and slave will start this timed task. 2.regularly expanding and Shrinking
+ * syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final ScheduledExecutorService scheduledService;
+    private final ExecutorService executorService;
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI brokerOuterAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+    private volatile State state = State.INITIAL;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
+        this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
+        this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
+        this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getControllerAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.controllerAddresses = new ArrayList<>(Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    enum State {
+        INITIAL,
+        FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+        RUNNING,
+    }
+
+    public void start() {
+        if (!startBasicService()) {
+            LOGGER.error("Failed to start replicasManager");
+            this.executorService.submit(() -> {
+                int tryTimes = 1;
+                while (!startBasicService()) {
+                    tryTimes++;
+                    LOGGER.error("Failed to start replicasManager, try times:{}, current state:{}, try it again", tryTimes, this.state);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+                LOGGER.info("Start replicasManager success, try times:{}", tryTimes);
+            });
+        }
+    }
+
+    private boolean startBasicService() {
+        if (this.state == State.INITIAL) {
+            if (schedulingSyncControllerMetadata()) {
+                LOGGER.info("First time sync controller metadata success");
+                this.state = State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE;
+            } else {
+                return false;
+            }
+        }
+
+        if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
+            if (registerBrokerToController()) {
+                LOGGER.info("First time register broker success");
+                this.state = State.RUNNING;
+            } else {
+                return false;
+            }
+        }
+
+        schedulingSyncBrokerMetadata();
+
+        // Register syncStateSet changed listener.
+        this.haService.registerSyncStateSetChangedListener(this::doReportSyncStateSetChanged);
+        return true;
+    }
+
+    public void shutdown() {
+        this.state = State.INITIAL;
+        this.executorService.shutdown();
+        this.scheduledService.shutdown();
+    }

Review Comment:
   I shutdown the replicasManager after the message store shutdown, so I don't think it's unnecessary to remove listener



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org