You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/06/20 02:39:40 UTC

[GitHub] [rocketmq] RongtongJin opened a new pull request, #4484: [DISCUSS][RIP-44] Support DLedger Controller

RongtongJin opened a new pull request, #4484:
URL: https://github.com/apache/rocketmq/pull/4484

   ## What is the purpose of the change
   
   After the release of RocketMQ 4.5.0, the DLedger mode (raft) was introduced. The raft commitlog under this architecture is used to replace the original commitlog so that it has the ability to failover. However, there are some disadvantages going with this architecture due to the raft capability on replication, including:
   
   1. To have failover ability, the number of replicas in the broker group must be 3 or more
   
   2. Acks from replicas need to strictly follow the majority rule of the Raft protocol, that is, 3-replica architecture requires acks from 2 replicas to return, and 5-replica architecture requires acks from 3 to return
   
   3. Since the store repository relies on OpenMessaging DLedger in DLedger mode, Native storage and replication capabilities of RocketMQ (such as transientStorePool and zero-copy capabilities) cannot be reused, and maintenance becomes difficult as well.
   
   To handle those mentioned problems, I would like to start an RIP-44 Support DLedger Controller. With this improvement, DLedger (Raft) capability will be abstracted onto the upper layer, becoming an optional and loosely coupled coordination component named DLedger Controller.
   
   After the deployment of DLedger Controller, the master-slave architecture will also equip with failover capability. The DLedger Controller can optionally be embedded into the NameServer (the NameServer itself remains stateless and cannot provide electoral capabilities when the majority is down), or it can be deployed independently.
   
   DLedger controller is an optional component that does not change the previous operation and maintenance mode. Compared with other components, its downtime will not affect online services. In addition, RIP-44 unifies the storage and replication of RocketMQ, resulting in lower maintenance costs and faster development iterations. In terms of compatibility, the master-slave architecture can upgrade without compatibility problems.
   
   I've already done part of the work with @hzh0425 . Our proposals are provided at the links below:
   
   https://docs.google.com/document/d/1tSJkor_3Js4NBaVA0UENGyM8Mh0SrRMXszRyI91hjJ8/edit?usp=sharing
   
   Chinese version:
   
   https://shimo.im/docs/N2A1Mz9QZltQZoAD/
   
   ## Brief changelog
   
   Refer https://shimo.im/docs/N2A1Mz9QZltQZoAD#anchor-qJhl
   
   ## Verifying this change
   
   Refer UTs, ITs and [testing report](https://shimo.im/docs/0l3NVLR2PdhwJy3R)
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on pull request #4484: [DISCUSS][RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#issuecomment-1170928177

   6.29 review意见:
   
   1. 以SyncStateSet不以IP作为依据,而是以brokerId(Controller给broker的唯一编号)作为依据,HA部分同步要改。
   2. 整理InSyncReplicas、minInSyncReplicas在Controller模式下含义,当SyncStateSet个数小于minInSyncReplicas时,副本写入将不再成功,返回PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH
   3. 不再复用NameServer的心跳,都使用Controller独立的心跳、去掉controllerDeployedStandAlone参数,心跳包新增maxOffset、confirmOffset、last epoch等字段。这些会作为选举的依据
   4. 选举后SyncStateSet应该继承下来而不是直接变为新Master,一个场景是新被选出的主,该主马上又挂了,此时就没有master可以被选举(不过从当前实现来看,若改为继承,slave若没有马上连到master,会增加故障恢复的时间,并且master会马上更新的isr,而slave连上master首先需要获取haMasterAdress从namesrv注册结果中返回,需要一段时间),同时这种情况下注意confirmOffset回退问题(先进行分析)
   5. Controller 的格式带上rocketmq message header,以便于后面扩展
   6. 对于磁盘坏、心跳好的情况,需要再细分PutMessageStatus(能显示磁盘坏),如果磁盘坏了,同样需要选举切换。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4484: [DISCUSS][RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on code in PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#discussion_r906029037


##########
common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterResponseHeader.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class BrokerRegisterResponseHeader implements CommandCustomHeader {

Review Comment:
   BrokerRegisterResponseHeader  should be rename to RegisterBrokerToControllerResponseHeader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] coveralls commented on pull request #4484: [DISCUSS][RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
coveralls commented on PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#issuecomment-1159930052

   
   [![Coverage Status](https://coveralls.io/builds/50162909/badge)](https://coveralls.io/builds/50162909)
   
   Coverage increased (+0.2%) to 47.4% when pulling **a8549e24a191327b0c1fd69236f7c0f719729c8d on 5.0.0-beta-dledger-controller** into **cd24a244248db32819474328672868a7ff6ee1f2 on 5.0.0-beta**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on pull request #4484: [RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#issuecomment-1191249780

   Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #4484: [DISCUSS][RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#issuecomment-1159928616

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4484?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#4484](https://codecov.io/gh/apache/rocketmq/pull/4484?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a8549e2) into [5.0.0-beta](https://codecov.io/gh/apache/rocketmq/commit/a4cccea9db284e92812b2c9f17043a8b4ce589d8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a4cccea) will **increase** coverage by `0.41%`.
   > The diff coverage is `45.49%`.
   
   ```diff
   @@               Coverage Diff                @@
   ##             5.0.0-beta    #4484      +/-   ##
   ================================================
   + Coverage         43.02%   43.43%   +0.41%     
   - Complexity         6016     6444     +428     
   ================================================
     Files               796      846      +50     
     Lines             56945    60443    +3498     
     Branches           7796     8311     +515     
   ================================================
   + Hits              24502    26255    +1753     
   - Misses            29231    30813    +1582     
   - Partials           3212     3375     +163     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/4484?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...java/org/apache/rocketmq/broker/BrokerStartup.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvQnJva2VyU3RhcnR1cC5qYXZh) | `7.64% <0.00%> (-0.05%)` | :arrow_down: |
   | [...he/rocketmq/broker/controller/ReplicasManager.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvY29udHJvbGxlci9SZXBsaWNhc01hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...org/apache/rocketmq/broker/out/BrokerOuterAPI.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvb3V0L0Jyb2tlck91dGVyQVBJLmphdmE=) | `19.25% <0.00%> (-2.32%)` | :arrow_down: |
   | [...ocketmq/broker/processor/AdminBrokerProcessor.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL0FkbWluQnJva2VyUHJvY2Vzc29yLmphdmE=) | `25.46% <0.00%> (-0.39%)` | :arrow_down: |
   | [...g/apache/rocketmq/client/impl/MQClientAPIImpl.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudEFQSUltcGwuamF2YQ==) | `24.01% <0.00%> (-0.48%)` | :arrow_down: |
   | [...ava/org/apache/rocketmq/common/BrokerAddrInfo.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vQnJva2VyQWRkckluZm8uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...a/org/apache/rocketmq/common/ControllerConfig.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vQ29udHJvbGxlckNvbmZpZy5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...in/java/org/apache/rocketmq/common/EpochEntry.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vRXBvY2hFbnRyeS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...c/main/java/org/apache/rocketmq/common/MixAll.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vTWl4QWxsLmphdmE=) | `44.29% <ø> (ø)` | |
   | [...rg/apache/rocketmq/common/constant/LoggerName.java](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vY29uc3RhbnQvTG9nZ2VyTmFtZS5qYXZh) | `0.00% <ø> (ø)` | |
   | ... and [104 more](https://codecov.io/gh/apache/rocketmq/pull/4484/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/4484?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4484?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [a4cccea...a8549e2](https://codecov.io/gh/apache/rocketmq/pull/4484?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShannonDing commented on a diff in pull request #4484: [RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
ShannonDing commented on code in PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#discussion_r925113845


##########
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java:
##########
@@ -1351,6 +1359,10 @@ protected void startBasicService() throws Exception {
             this.messageStore.start();
         }
 
+        if (this.replicasManager != null) {
+            this.replicasManager.start();

Review Comment:
   Seem lack of replicasManager stop when controller shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShannonDing commented on a diff in pull request #4484: [RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
ShannonDing commented on code in PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#discussion_r925130724


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.controller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including: 0.regularly syncing controller metadata, change controller leader address,
+ * both master and slave will start this timed task. 1.regularly syncing metadata from controllers, and changing broker
+ * roles and master if needed, both master and slave will start this timed task. 2.regularly expanding and Shrinking
+ * syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final ScheduledExecutorService scheduledService;
+    private final ExecutorService executorService;
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI brokerOuterAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+    private volatile State state = State.INITIAL;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
+        this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
+        this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
+        this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getControllerAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.controllerAddresses = new ArrayList<>(Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    enum State {
+        INITIAL,
+        FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+        RUNNING,
+    }
+
+    public void start() {
+        if (!startBasicService()) {
+            LOGGER.error("Failed to start replicasManager");
+            this.executorService.submit(() -> {
+                int tryTimes = 1;
+                while (!startBasicService()) {
+                    tryTimes++;
+                    LOGGER.error("Failed to start replicasManager, try times:{}, current state:{}, try it again", tryTimes, this.state);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+                LOGGER.info("Start replicasManager success, try times:{}", tryTimes);
+            });
+        }
+    }
+
+    private boolean startBasicService() {
+        if (this.state == State.INITIAL) {
+            if (schedulingSyncControllerMetadata()) {
+                LOGGER.info("First time sync controller metadata success");
+                this.state = State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE;
+            } else {
+                return false;
+            }
+        }
+
+        if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
+            if (registerBrokerToController()) {
+                LOGGER.info("First time register broker success");
+                this.state = State.RUNNING;
+            } else {
+                return false;
+            }
+        }
+
+        schedulingSyncBrokerMetadata();
+
+        // Register syncStateSet changed listener.
+        this.haService.registerSyncStateSetChangedListener(this::doReportSyncStateSetChanged);
+        return true;
+    }
+
+    public void shutdown() {
+        this.state = State.INITIAL;
+        this.executorService.shutdown();
+        this.scheduledService.shutdown();
+    }

Review Comment:
   Why not remove SyncStateSetChangedListener in AutoSwitchHAService? 
   it seems the process will be called by HaService after replicaManger shutdown...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4484: [DISCUSS][RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on code in PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#discussion_r906028275


##########
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Broker info, mapping from brokerAddress to {brokerId, brokerHaAddress}.
+ */
+public class BrokerInfo {
+    private final String clusterName;
+    private final String brokerName;
+    // Start from 1
+    private final AtomicLong brokerIdCount;
+    private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable;

Review Comment:
   We are now using brokerAddress to identify a broker, but this may not be applicable in container environment, maybe we should use brokerId to identify broker



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on pull request #4484: [RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#issuecomment-1187616255

   > 6.29 review意见:
   > 
   > 1. 以SyncStateSet不以IP作为依据,而是以brokerId(Controller给broker的唯一编号)作为依据,HA部分同步要改。
   > 2. 整理InSyncReplicas、minInSyncReplicas在Controller模式下含义,当SyncStateSet个数小于minInSyncReplicas时,副本写入将不再成功,返回PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH
   > 3. 不再复用NameServer的心跳,都使用Controller独立的心跳、去掉controllerDeployedStandAlone参数,心跳包新增maxOffset、confirmOffset、last epoch等字段。这些会作为选举的依据
   > 4. 选举后SyncStateSet应该继承下来而不是直接变为新Master,一个场景是新被选出的主,该主马上又挂了,此时就没有master可以被选举(不过从当前实现来看,若改为继承,slave若没有马上连到master,会增加故障恢复的时间,并且master会马上更新新的SyncStateSet,而slave连上master首先需要获取haMasterAddress从namesrv注册结果中返回,需要一段时间(考虑在controller中维护haMasterAddress)),同时这种情况下注意confirmOffset回退问题(先进行分析)
   > 5. Controller 的格式带上rocketmq message header,以便于后面扩展
   > 6. 对于磁盘坏、心跳好的情况,需要再细分PutMessageStatus(能显示磁盘坏),如果磁盘坏了,同样需要选举切换。
   
   已完成第2、3点修复。第1点修复@hzh0425进行中


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] duhenglucky merged pull request #4484: [RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
duhenglucky merged PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4484: [DISCUSS][RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on code in PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#discussion_r906028741


##########
common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class BrokerRegisterRequestHeader implements CommandCustomHeader {
+    private String clusterName;

Review Comment:
   BrokerRegisterRequestHeader should be rename to RegisterBrokerToControllerRequestHeader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on pull request #4484: [DISCUSS][RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#issuecomment-1159984705

   tracking issue: https://github.com/apache/rocketmq/issues/4330


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShannonDing commented on a diff in pull request #4484: [RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
ShannonDing commented on code in PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#discussion_r925113845


##########
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java:
##########
@@ -1351,6 +1359,10 @@ protected void startBasicService() throws Exception {
             this.messageStore.start();
         }
 
+        if (this.replicasManager != null) {
+            this.replicasManager.start();

Review Comment:
   Seems lack of replicasManager stop when controller shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4484: [RIP-44] Support DLedger Controller

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4484:
URL: https://github.com/apache/rocketmq/pull/4484#discussion_r925132475


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.controller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including: 0.regularly syncing controller metadata, change controller leader address,
+ * both master and slave will start this timed task. 1.regularly syncing metadata from controllers, and changing broker
+ * roles and master if needed, both master and slave will start this timed task. 2.regularly expanding and Shrinking
+ * syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final ScheduledExecutorService scheduledService;
+    private final ExecutorService executorService;
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final BrokerConfig brokerConfig;
+    private final String localAddress;
+    private final BrokerOuterAPI brokerOuterAPI;
+    private final List<String> controllerAddresses;
+
+    private volatile String controllerLeaderAddress = "";
+    private volatile State state = State.INITIAL;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
+        this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
+        this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
+        this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getControllerAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.controllerAddresses = new ArrayList<>(Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    enum State {
+        INITIAL,
+        FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+        RUNNING,
+    }
+
+    public void start() {
+        if (!startBasicService()) {
+            LOGGER.error("Failed to start replicasManager");
+            this.executorService.submit(() -> {
+                int tryTimes = 1;
+                while (!startBasicService()) {
+                    tryTimes++;
+                    LOGGER.error("Failed to start replicasManager, try times:{}, current state:{}, try it again", tryTimes, this.state);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+                LOGGER.info("Start replicasManager success, try times:{}", tryTimes);
+            });
+        }
+    }
+
+    private boolean startBasicService() {
+        if (this.state == State.INITIAL) {
+            if (schedulingSyncControllerMetadata()) {
+                LOGGER.info("First time sync controller metadata success");
+                this.state = State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE;
+            } else {
+                return false;
+            }
+        }
+
+        if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
+            if (registerBrokerToController()) {
+                LOGGER.info("First time register broker success");
+                this.state = State.RUNNING;
+            } else {
+                return false;
+            }
+        }
+
+        schedulingSyncBrokerMetadata();
+
+        // Register syncStateSet changed listener.
+        this.haService.registerSyncStateSetChangedListener(this::doReportSyncStateSetChanged);
+        return true;
+    }
+
+    public void shutdown() {
+        this.state = State.INITIAL;
+        this.executorService.shutdown();
+        this.scheduledService.shutdown();
+    }

Review Comment:
   I shutdown the replicasManager after the message store shutdown, so I don't think it's unnecessary to remove listener



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org