You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/05/10 06:31:33 UTC

[GitHub] [rocketmq] hzh0425 opened a new pull request, #4272: [Summer of Code] Support switch role for broker

hzh0425 opened a new pull request, #4272:
URL: https://github.com/apache/rocketmq/pull/4272

   
   ## What is the purpose of the change
   We want unified log replication, using rocketmq's original HaService instead of dledger mode.
   Previously, I have done the following job with @RongtongJin
   
   - Add statemachine mode for dledger: https://github.com/openmessaging/dledger/pull/128
   - Embed a strongly consistent controller based on dledger in name-srv: https://github.com/apache/rocketmq/pull/4195
   - Add a new HaService -- AutoSwitchHAService, which use new log replicating protocol to support switch role in haService level.: https://github.com/apache/rocketmq/pull/4236
   
   This pr is the last pr: connecting the interface of Dledger-controller at the Broker level, so that the Broker has the ability of master-slave switching
   
   Noted that this Pr is still in development and not fully completed.
   
   ## Brief changelog
   The architecture is shown in the figure below. The pr is a link between the 'DledgerController' and 'AutoSwitchHASerivce' through a new component - ReplicasManager, it connects to the api of the controller and notifies the lower AutoSwitchHASerivce.
   
   <img width="691" alt="image" src="https://user-images.githubusercontent.com/58988019/167562365-a083f415-1701-425f-92cc-eb72eb2b3f0c.png">
   
   ## Verifying this change
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4272: [Summer of Code] Support switch role for broker

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4272:
URL: https://github.com/apache/rocketmq/pull/4272#discussion_r869840414


##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/HaControllerProxy.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import static org.apache.rocketmq.common.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
+import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+
+/**
+ * The proxy of controller api.
+ */
+public class HaControllerProxy {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int RPC_TIME_OUT = 3000;
+    private final RemotingClient remotingClient;
+    private final List<String> controllerAddresses;
+    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ControllerProxy_"));
+    private volatile String controllerLeaderAddress = "";
+
+    public HaControllerProxy(final NettyClientConfig nettyClientConfig, final List<String> controllerAddresses) {
+        this.remotingClient = new NettyRemotingClient(nettyClientConfig);

Review Comment:
   建议API的具体实现还是放到brokerOutAPI,不要新建NettyRemotingClient



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ScheduledExecutorService syncMetadataService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_SyncMetadata_"));
+    private final ScheduledExecutorService checkSyncStateSetService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_CheckSyncStateSet_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final HaControllerProxy proxy;
+    private final String clusterName;
+    private final String brokerName;
+    private final String localAddress;
+    private final String localHaAddress;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private BrokerRole currentRole = BrokerRole.SLAVE;
+    private Long brokerId = -1L;

Review Comment:
   这两个为什么不直接用brokerConfig或messageStoreConfig配置里的?如果有多份,可能会有多余的一致性问题



##########
container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java:
##########
@@ -86,7 +86,7 @@ public void run2() {
             }
         }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
 
-        if (this.brokerConfig.isEnableSlaveActingMaster()) {
+        if (this.brokerConfig.isEnableSlaveActingMaster() || this.messageStoreConfig.isStartupControllerMode()) {

Review Comment:
   建议修改成和BrokerController那一样,不然会调用syncBrokerMemberGroup



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/HaControllerProxy.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import static org.apache.rocketmq.common.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
+import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+
+/**
+ * The proxy of controller api.
+ */
+public class HaControllerProxy {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int RPC_TIME_OUT = 3000;
+    private final RemotingClient remotingClient;
+    private final List<String> controllerAddresses;
+    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ControllerProxy_"));
+    private volatile String controllerLeaderAddress = "";
+
+    public HaControllerProxy(final NettyClientConfig nettyClientConfig, final List<String> controllerAddresses) {
+        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+        this.controllerAddresses = controllerAddresses;
+    }
+
+    public boolean start() {
+        this.remotingClient.start();
+        // Get controller metadata first.
+        int tryTimes = 0;
+        while (tryTimes < 3) {
+            boolean flag = updateControllerMetadata();
+            if (flag) {
+                this.executorService.scheduleAtFixedRate(this::updateControllerMetadata, 0, 2, TimeUnit.SECONDS);
+                return true;
+            }
+            tryTimes ++;
+        }
+        LOGGER.error("Failed to init controller metadata, maybe the controllers in {} is not available", this.controllerAddresses);
+        return false;
+    }
+
+    public void shutdown() {
+        this.remotingClient.shutdown();
+        this.executorService.shutdown();
+    }
+
+    /**
+     * Update controller metadata(leaderAddress)
+     */
+    private boolean updateControllerMetadata() {
+        for (final String address : this.controllerAddresses) {
+            final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_METADATA_INFO, null);
+            try {
+                final RemotingCommand response = this.remotingClient.invokeSync(address, request, RPC_TIME_OUT);
+                if (response.getCode() == SUCCESS) {
+                    final GetMetaDataResponseHeader responseHeader = response.decodeCommandCustomHeader(GetMetaDataResponseHeader.class);
+                    if (responseHeader != null && responseHeader.isLeader()) {
+                        // Because the controller is served externally with the help of name-srv
+                        this.controllerLeaderAddress = address;
+                        LOGGER.info("Change controller leader address to {}", this.controllerAddresses);
+                        return true;
+                    }
+                }
+            } catch (final Exception e) {
+                LOGGER.error("Error happen when pull controller metadata", e);
+                return false;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Alter syncStateSet
+     */
+    public SyncStateSet alterSyncStateSet(String brokerName,
+        final String masterAddress, final int masterEpoch,
+        final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws Exception {
+
+        final AlterSyncStateSetRequestHeader requestHeader = new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch);
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, requestHeader);
+        request.setBody(new SyncStateSet(newSyncStateSet, syncStateSetEpoch).encode());
+        final RemotingCommand response = this.remotingClient.invokeSync(this.controllerLeaderAddress, request, RPC_TIME_OUT);
+        assert response != null;
+        switch (response.getCode()) {
+            case SUCCESS: {
+                assert response.getBody() != null;
+                return RemotingSerializable.decode(response.getBody(), SyncStateSet.class);
+            }
+            case CONTROLLER_NOT_LEADER: {
+                throw new MQBrokerException(response.getCode(), "Controller leader was changed");
+            }
+        }

Review Comment:
   建议增加更多的response code case,下面也一样



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ScheduledExecutorService syncMetadataService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_SyncMetadata_"));
+    private final ScheduledExecutorService checkSyncStateSetService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_CheckSyncStateSet_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final HaControllerProxy proxy;
+    private final String clusterName;
+    private final String brokerName;
+    private final String localAddress;
+    private final String localHaAddress;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private BrokerRole currentRole = BrokerRole.SLAVE;
+    private Long brokerId = -1L;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController, final MessageStore messageStore) {

Review Comment:
   brokerController里可以拿到MessageStore,所以入参只需要brokerControlle即可



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerInfo.java:
##########
@@ -17,23 +17,26 @@
 package org.apache.rocketmq.namesrv.controller.manager;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.Pair;
 
 /**
- * Broker id info, mapping from brokerAddress to brokerId.
+ * Broker info, mapping from brokerAddress to {brokerId, brokerHaAddress}.
  */
-public class BrokerIdInfo {
+public class BrokerInfo {
     private final String clusterName;
     private final String brokerName;
     // Start from 1
     private final AtomicLong brokerIdCount;
-    private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable;
+    private final HashMap<String/*Address*/, Pair<Long/*brokerId*/, String/*HaAddress*/>> brokerTable;

Review Comment:
   不需要返回HaAddress,以NameServer注册为准



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ScheduledExecutorService syncMetadataService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_SyncMetadata_"));
+    private final ScheduledExecutorService checkSyncStateSetService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_CheckSyncStateSet_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final HaControllerProxy proxy;
+    private final String clusterName;
+    private final String brokerName;
+    private final String localAddress;
+    private final String localHaAddress;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private BrokerRole currentRole = BrokerRole.SLAVE;
+    private Long brokerId = -1L;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController, final MessageStore messageStore) {
+        this.brokerController = brokerController;
+        this.haService = (AutoSwitchHAService) messageStore.getHaService();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getNamesrvAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.proxy = new HaControllerProxy(brokerController.getNettyClientConfig(), Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.clusterName = brokerConfig.getBrokerClusterName();
+        this.brokerName = brokerConfig.getBrokerName();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.localHaAddress = brokerController.getHAServerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    public boolean start() {

Review Comment:
   返回boolean好像并没有用



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #4272: [Summer of Code] Support switch role for broker

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #4272:
URL: https://github.com/apache/rocketmq/pull/4272#issuecomment-1127342973

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4272?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#4272](https://codecov.io/gh/apache/rocketmq/pull/4272?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c41aefc) into [5.0.0-beta-dledger-controller](https://codecov.io/gh/apache/rocketmq/commit/ce534eea149a422299d474faac15c73871dc7ba0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ce534ee) will **decrease** coverage by `0.28%`.
   > The diff coverage is `23.34%`.
   
   ```diff
   @@                         Coverage Diff                         @@
   ##             5.0.0-beta-dledger-controller    #4272      +/-   ##
   ===================================================================
   - Coverage                            43.73%   43.45%   -0.29%     
   - Complexity                            6272     6274       +2     
   ===================================================================
     Files                                  827      829       +2     
     Lines                                58610    58937     +327     
     Branches                              8003     8046      +43     
   ===================================================================
   - Hits                                 25634    25610      -24     
   - Misses                               29670    30008     +338     
   - Partials                              3306     3319      +13     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/4272?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...a/org/apache/rocketmq/broker/BrokerController.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvQnJva2VyQ29udHJvbGxlci5qYXZh) | `45.20% <0.00%> (-3.92%)` | :arrow_down: |
   | [.../rocketmq/broker/hacontroller/ReplicasManager.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvaGFjb250cm9sbGVyL1JlcGxpY2FzTWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...org/apache/rocketmq/broker/out/BrokerOuterAPI.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvb3V0L0Jyb2tlck91dGVyQVBJLmphdmE=) | `19.86% <0.00%> (-1.71%)` | :arrow_down: |
   | [...he/rocketmq/common/protocol/body/SyncStateSet.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvYm9keS9TeW5jU3RhdGVTZXQuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...srv/controller/AlterSyncStateSetRequestHeader.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvaGVhZGVyL25hbWVzcnYvY29udHJvbGxlci9BbHRlclN5bmNTdGF0ZVNldFJlcXVlc3RIZWFkZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...rv/controller/AlterSyncStateSetResponseHeader.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvaGVhZGVyL25hbWVzcnYvY29udHJvbGxlci9BbHRlclN5bmNTdGF0ZVNldFJlc3BvbnNlSGVhZGVyLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...amesrv/controller/BrokerRegisterRequestHeader.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvaGVhZGVyL25hbWVzcnYvY29udHJvbGxlci9Ccm9rZXJSZWdpc3RlclJlcXVlc3RIZWFkZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...mesrv/controller/BrokerRegisterResponseHeader.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvaGVhZGVyL25hbWVzcnYvY29udHJvbGxlci9Ccm9rZXJSZWdpc3RlclJlc3BvbnNlSGVhZGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...r/namesrv/controller/ElectMasterRequestHeader.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvaGVhZGVyL25hbWVzcnYvY29udHJvbGxlci9FbGVjdE1hc3RlclJlcXVlc3RIZWFkZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../namesrv/controller/GetMetaDataResponseHeader.java](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvaGVhZGVyL25hbWVzcnYvY29udHJvbGxlci9HZXRNZXRhRGF0YVJlc3BvbnNlSGVhZGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | ... and [35 more](https://codecov.io/gh/apache/rocketmq/pull/4272/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/4272?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4272?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [ce534ee...c41aefc](https://codecov.io/gh/apache/rocketmq/pull/4272?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4272: [Summer of Code] Support switch role for broker

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4272:
URL: https://github.com/apache/rocketmq/pull/4272#discussion_r870914660


##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 0.regularly syncing controller metadata, change controller leader address, both master and slave will start this timed task.
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int SYNC_BROKER_METADATA_PERIOD = 5 * 1000;
+    public static final int SYNC_CONTROLLER_METADATA_PERIOD = 10 * 1000;
+    public static final int CHECK_SYNC_STATE_SET_PERIOD = 8 * 1000;
+
+    private final ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI outerAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.outerAPI = brokerController.getBrokerOuterAPI();
+        this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getNamesrvAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.controllerAddresses = new ArrayList<>(Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    public void start() {
+        if (!schedulingSyncControllerMetadata()) {
+            return;
+        }
+
+        if (!registerBroker()) {
+            return;
+        }
+
+        schedulingSyncBrokerMetadata();
+    }
+
+    public void shutdown() {
+        this.scheduledService.shutdown();
+    }
+
+    public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) {
+        synchronized (this) {
+            if (newMasterEpoch > this.masterEpoch) {
+                LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.localAddress, newMasterEpoch);
+
+                // Change record
+                this.masterAddress = this.localAddress;
+                this.masterEpoch = newMasterEpoch;
+
+                // Change sync state set
+                final HashSet<String> newSyncStateSet = new HashSet<>();
+                newSyncStateSet.add(this.localAddress);
+                changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
+                schedulingCheckSyncStateSet();
+
+                // Handle the slave synchronise
+                handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
+
+                this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
+                this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
+
+                // Register broker to name-srv
+                try {
+                    this.brokerController.registerBrokerAll(true, false, this.brokerController.getBrokerConfig().isForceRegister());
+                } catch (final Throwable e) {
+                    LOGGER.error("Error happen when register broker to name-srv, Failed to change broker to master", e);
+                    return;
+                }
+
+                // Notify ha service, change to master
+                this.haService.changeToMaster(newMasterEpoch);
+                LOGGER.info("Change broker {} to master success, masterEpoch {}, syncStateSetEpoch:{}", this.localAddress, newMasterEpoch, syncStateSetEpoch);
+            }
+        }
+    }
+
+    public void changeToSlave(final String newMasterAddress, final int newMasterEpoch) {
+        synchronized (this) {
+            if (newMasterEpoch > this.masterEpoch) {
+                LOGGER.info("Begin to change to slave, brokerName={}, replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getBrokerId());
+
+                // Change record
+                this.masterAddress = newMasterAddress;
+                this.masterEpoch = newMasterEpoch;
+                stopCheckSyncStateSet();
+
+                // Change config
+                this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE);
+                this.brokerController.changeSpecialServiceStatus(false);
+
+                // Handle the slave synchronise
+                handleSlaveSynchronize(BrokerRole.SLAVE);
+
+                // Register broker to name-srv
+                try {
+                    this.brokerController.registerBrokerAll(true, false, this.brokerController.getBrokerConfig().isForceRegister());
+                } catch (final Throwable e) {
+                    LOGGER.error("Error happen when register broker to name-srv, Failed to change broker to slave", e);
+                    return;
+                }

Review Comment:
   建议放在线程池异步执行,主要是有耗时的rpc调用,也不适合放在锁中



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 0.regularly syncing controller metadata, change controller leader address, both master and slave will start this timed task.
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int SYNC_BROKER_METADATA_PERIOD = 5 * 1000;
+    public static final int SYNC_CONTROLLER_METADATA_PERIOD = 10 * 1000;
+    public static final int CHECK_SYNC_STATE_SET_PERIOD = 8 * 1000;
+
+    private final ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI outerAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.outerAPI = brokerController.getBrokerOuterAPI();
+        this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getNamesrvAddr();

Review Comment:
   这里建议新建一个controllerAddr配置,不是每个nameserver都包含ccontroller



##########
test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.autoswitchrole;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.hacontroller.ReplicasManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
+
+    private ControllerConfig controllerConfig;
+    private NamesrvController namesrvController;
+
+    private BrokerController brokerController1;
+    private BrokerController brokerController2;
+
+    private MessageStoreConfig storeConfig1;
+    private MessageStoreConfig storeConfig2;
+    private BrokerConfig brokerConfig1;
+    private BrokerConfig brokerConfig2;
+    private NettyServerConfig brokerNettyServerConfig1;
+    private NettyServerConfig brokerNettyServerConfig2;
+
+
+    @Before
+    public void init() throws Exception {
+        super.initialize();
+
+        // Startup namesrv
+        final String peers = String.format("n0-localhost:%d", 30000);
+        final NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(31000);
+
+        this.controllerConfig = buildControllerConfig("n0", peers);
+        this.namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig(), controllerConfig);
+        assertTrue(namesrvController.initialize());
+        namesrvController.start();
+
+        final String namesrvAddress = "127.0.0.1:31000;";
+        for (int i = 0; i < 2; i++) {
+            final MessageStoreConfig storeConfig = buildMessageStoreConfig("broker" + i, 20000 + i);
+            final BrokerConfig brokerConfig = new BrokerConfig();
+            brokerConfig.setListenPort(21000 + i);
+            brokerConfig.setNamesrvAddr(namesrvAddress);
+            brokerConfig.setMetaDataHosts(namesrvAddress);
+
+            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+            nettyServerConfig.setListenPort(22000 + i);
+
+            final BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, new NettyClientConfig(), storeConfig);
+            assertTrue(brokerController.initialize());
+            brokerController.start();
+            System.out.println("Start controller success");
+            Thread.sleep(1000);
+            // The first is master
+            if (i == 0) {
+                assertTrue(brokerController.getReplicasManager().isMasterState());
+                this.brokerController1 = brokerController;
+                this.storeConfig1 = storeConfig;
+                this.brokerConfig1 = brokerConfig;
+                this.brokerNettyServerConfig1 = nettyServerConfig;
+            } else {
+                assertFalse(brokerController.getReplicasManager().isMasterState());
+                this.brokerController2 = brokerController;
+                this.storeConfig2 = storeConfig;
+                this.brokerConfig2 = brokerConfig;
+                this.brokerNettyServerConfig2 = nettyServerConfig;
+            }
+        }
+
+        // Wait slave connecting to master
+        Thread.sleep(15000);
+    }
+
+    public void mockData() throws Exception {
+        System.out.println("Begin test");
+        final MessageStore messageStore = brokerController1.getMessageStore();
+        putMessage(messageStore);
+
+        // Check slave message
+        checkMessage(brokerController2.getMessageStore(), 10, 0);
+    }
+
+    @Test
+    public void testCheckSyncStateSet() throws Exception {
+        mockData();
+
+        // Check sync state set
+        final ReplicasManager replicasManager = brokerController1.getReplicasManager();
+        final SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
+        assertEquals(2, syncStateSet.getSyncStateSet().size());
+    }
+
+    @Test
+    public void testChangeMaster() throws Exception {
+        mockData();
+
+        // Let master shutdown
+        brokerController1.shutdown();
+        Thread.sleep(5000);
+
+        // The slave should change to master
+        assertTrue(brokerController2.getReplicasManager().isMasterState());
+        assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
+
+        // Restart old master, it should be slave
+        brokerController1 = new BrokerController(brokerConfig1, brokerNettyServerConfig1, new NettyClientConfig(), storeConfig1);
+        brokerController1.initialize();
+        brokerController1.start();
+
+        Thread.sleep(20000);
+        assertFalse(brokerController1.getReplicasManager().isMasterState());
+        assertEquals(brokerController1.getReplicasManager().getMasterAddress(), brokerController2.getReplicasManager().getLocalAddress());
+
+        // Put another batch messages
+        final MessageStore messageStore = brokerController2.getMessageStore();
+        putMessage(messageStore);
+
+        // Check slave message
+        checkMessage(brokerController1.getMessageStore(), 20, 0);
+    }
+
+

Review Comment:
   最好是能增加一下测试,能覆盖到截断算法



##########
container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java:
##########
@@ -113,11 +100,32 @@ public void run2() {
             }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
         }
 
+        if (this.messageStoreConfig.isStartupControllerMode()) {
+            scheduleSendHeartbeat();
+        }
+
         if (brokerConfig.isSkipPreOnline()) {
             startServiceWithoutCondition();
         }
     }
 
+    private void scheduleSendHeartbeat() {
+        scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
+            @Override
+            public void run2() {
+                if (isIsolated) {
+                    return;
+                }
+                try {
+                    InnerBrokerController.this.sendHeartbeat();
+                } catch (Exception e) {
+                    BrokerController.LOG.error("sendHeartbeat Exception", e);
+                }
+
+            }
+        }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
+    }

Review Comment:
   把父类brokerController的方法改为protected,直接使用即可



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 0.regularly syncing controller metadata, change controller leader address, both master and slave will start this timed task.
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int SYNC_BROKER_METADATA_PERIOD = 5 * 1000;
+    public static final int SYNC_CONTROLLER_METADATA_PERIOD = 10 * 1000;
+    public static final int CHECK_SYNC_STATE_SET_PERIOD = 8 * 1000;
+
+    private final ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI outerAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.outerAPI = brokerController.getBrokerOuterAPI();
+        this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getNamesrvAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.controllerAddresses = new ArrayList<>(Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    public void start() {
+        if (!schedulingSyncControllerMetadata()) {
+            return;
+        }
+
+        if (!registerBroker()) {
+            return;
+        }
+
+        schedulingSyncBrokerMetadata();
+    }
+
+    public void shutdown() {
+        this.scheduledService.shutdown();
+    }
+
+    public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) {
+        synchronized (this) {
+            if (newMasterEpoch > this.masterEpoch) {
+                LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.localAddress, newMasterEpoch);
+
+                // Change record
+                this.masterAddress = this.localAddress;
+                this.masterEpoch = newMasterEpoch;
+
+                // Change sync state set
+                final HashSet<String> newSyncStateSet = new HashSet<>();
+                newSyncStateSet.add(this.localAddress);
+                changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
+                schedulingCheckSyncStateSet();
+
+                // Handle the slave synchronise
+                handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
+
+                this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
+                this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);

Review Comment:
   忘记changeSpecialServiceStatus为true



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -42,22 +43,33 @@
 
 public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
+    /**
+     * Handshake header buffer size. Schema: state ordinal + flag(isSyncFromLastFile) + slaveId + slaveAddressLength.
+     */
+    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;

Review Comment:
   还是把slaveAddress内容放到header里面而不是body里面,单独把ip放在body里面有点奇怪



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4272: [Summer of Code] Support switch role for broker

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on code in PR #4272:
URL: https://github.com/apache/rocketmq/pull/4272#discussion_r871065178


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -42,22 +43,33 @@
 
 public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
+    /**
+     * Handshake header buffer size. Schema: state ordinal + flag(isSyncFromLastFile) + slaveId + slaveAddressLength.
+     */
+    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;

Review Comment:
   嗯嗯, 后续应该会直接用 brokerId 来标识, 启用 address.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] coveralls commented on pull request #4272: [Summer of Code] Support switch role for broker

Posted by GitBox <gi...@apache.org>.
coveralls commented on PR #4272:
URL: https://github.com/apache/rocketmq/pull/4272#issuecomment-1127371298

   
   [![Coverage Status](https://coveralls.io/builds/49139024/badge)](https://coveralls.io/builds/49139024)
   
   Coverage decreased (-0.3%) to 47.653% when pulling **c41aefc4ce92c8de51fc10f6ff66c7b4cb499665 on hzh0425:feature/role-switch** into **ce534eea149a422299d474faac15c73871dc7ba0 on apache:5.0.0-beta-dledger-controller**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin merged pull request #4272: [Summer of Code] Support switch role for broker

Posted by GitBox <gi...@apache.org>.
RongtongJin merged PR #4272:
URL: https://github.com/apache/rocketmq/pull/4272


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org