You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "TheR1sing3un (via GitHub)" <gi...@apache.org> on 2023/02/16 16:34:24 UTC

[GitHub] [rocketmq] TheR1sing3un opened a new pull request, #6100: [ISSUE#5989] Support new

TheR1sing3un opened a new pull request, #6100:
URL: https://github.com/apache/rocketmq/pull/6100

   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   <!--
   If this PR fixes a GitHub issue, please add `fixes #<XXX>` or `closes #<XXX>`. Please refer to the documentation for more information:
   https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue
   -->
   
   fix #<xxx>[5989](https://github.com/apache/rocketmq/issues/5989)<!-- <xxx> replace with issue id -->
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113839415


##########
tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java:
##########
@@ -45,7 +46,7 @@ public Options buildCommandlineOptions(Options options) {
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("b", "brokerAddress", true, "The address of the broker which requires to become master");

Review Comment:
   我觉得可以加一个命令获取对应关系,这样更规范一点,不需要在elemaster那里再更改一些逻辑了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113844700


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -108,10 +124,23 @@ public long getConfirmOffset() {
     enum State {
         INITIAL,
         FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+
+        FIRST_TIME_REGISTER_TO_CONTROLLER_DONE,
+
         RUNNING,
         SHUTDOWN,
     }

Review Comment:
   > Could the enum name be changed to make it more semantically expressive
   
   Yep! I think rename it to `REGISTER_TO_CONTROLLER_DONE` will be more expressive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113845676


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -322,35 +438,150 @@ private boolean registerBrokerToController() {
         }
     }
 
+    /**
+     * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set
+     * @return next brokerId in this broker-set
+     */
+    private Long getNextBrokerId() {
+        try {
+            GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+            return nextBrokerIdResp.getNextBrokerId();
+        } catch (Exception e) {
+            LOGGER.error("fail to get next broker id from controller", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create temp metadata file in local file system, records the brokerId and registerCheckCode
+     * @param brokerId the brokerId that is expected to be assigned
+     * @return whether the temp meta file is created successfully
+     */
+
+    private boolean createTempMetadataFile(Long brokerId) {
+        // generate register check code, format like that: $ipAddress;$timestamp
+        String registerCheckCode = this.localAddress + ";" + System.currentTimeMillis();
+        try {
+            this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("update and persist temp broker metadata file failed", e);
+            this.tempBrokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send applyBrokerId request to controller
+     * @return whether controller has assigned this brokerId for this broker
+     */
+    private boolean applyBrokerId() {
+        try {
+            ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
+                    tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+            return true;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId());
+            return false;
+        }
+    }
+
+    /**
+     * Create metadata file and delete temp metadata file
+     * @return whether process success
+     */
+    private boolean createMetadataFileAndDeleteTemp() {
+        // create metadata file and delete temp metadata file
+        try {
+            this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
+            this.tempBrokerMetadata.clear();
+            this.brokerId = this.brokerMetadata.getBrokerId();
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to create metadata file", e);
+            this.brokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send registerSuccess request to inform controller that now broker has been registered successfully and controller should update broker ipAddress if changed
+     * @return whether request success
+     */
+    private boolean registerSuccess() {
+        try {
+            RegisterSuccessResponseHeader response = this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, localAddress, controllerLeaderAddress);
+            final Long masterBrokerId = response.getMasterBrokerId();
+            final String masterAddress = response.getMasterAddress();
+            if (masterBrokerId == null) {
+                return true;
+            }
+            if (this.brokerId.equals(masterBrokerId)) {
+                changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId);
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to send registerSuccess request to controller", e);
+            return false;
+        }
+    }

Review Comment:
   > 
   
   Yep! I rename it to `registerBrokerToController`, only if this request is successful will this register process is completely finished !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] mxsm commented on pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "mxsm (via GitHub)" <gi...@apache.org>.
mxsm commented on PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#issuecomment-1439380587

   > 该方案是否有办法完成比较好兼容性升级?举个例子 1.先升controller组件(可能需要所有controller停机删除数据后再升级),升级完成后,broker不具备选举能力,但仍能正常工作(最低要求) 2.升级Broker组件,可以保证升级后正常上线,不丢数据。(最好是能保证主备关系)
   
   @TheR1sing3un 后续可以更新一下https://github.com/apache/rocketmq/tree/develop/docs/cn/controller下面的对应文档(中英文)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113846222


##########
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+
+/**
+ * Broker replicas info, mapping from brokerAddress to {brokerId, brokerHaAddress}.
+ */
+public class BrokerReplicaInfo {
+    private final String clusterName;
+
+    private final String brokerName;
+
+    // Start from 1
+    private final AtomicLong nextAssignBrokerId;
+
+    private final HashMap<Long/*brokerId*/, Pair<String/*ipAddress*/, String/*registerCheckCode*/>> brokerIdInfo;
+
+    public BrokerReplicaInfo(String clusterName, String brokerName) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.nextAssignBrokerId = new AtomicLong(MixAll.FIRST_SLAVE_ID);

Review Comment:
   > 这里用FIRST_SLAVE_ID语义不是很好,这是brokerControlllerId编号,跟slaveId没什么关系
   
   Yep! I rename it to `FIRST_BROKER_CONTROLLER_ID`!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113850068


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class TempBrokerMetadata extends MetadataFile {
+
+    private String clusterName;
+

Review Comment:
   > How about TempBrokerMetadata extends BrokerMetadata
   
   Looks good! Done~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1125868400


##########
controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java:
##########
@@ -77,100 +81,140 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
                 request);
         }
         switch (request.getCode()) {
-            case CONTROLLER_ALTER_SYNC_STATE_SET: {
-                final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
-                final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_ELECT_MASTER: {
-                final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
-                if (future != null) {
-                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
-
-                    if (null != responseHeader) {
-                        if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
-                            this.controllerManager.notifyBrokerRoleChanged(responseHeader, electMasterRequest.getClusterName());
-                        }
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_REGISTER_BROKER: {
-                final RegisterBrokerToControllerRequestHeader controllerRequest = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(controllerRequest);
-                if (future != null) {
-                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
-                    if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
-                        this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                            responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
-                            controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_GET_REPLICA_INFO: {
-                final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_GET_METADATA_INFO: {
-                return this.controllerManager.getController().getControllerMetadata();
-            }
-            case BROKER_HEARTBEAT: {
-                final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-                this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
-                    requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
-                return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
-            }
-            case CONTROLLER_GET_SYNC_STATE_DATA: {
-                if (request.getBody() != null) {
-                    final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
-                    if (brokerNames != null && brokerNames.size() > 0) {
-                        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
-                        if (future != null) {
-                            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                        }
-                    }
-                }
-                break;
-            }
+            case CONTROLLER_ALTER_SYNC_STATE_SET:
+                return this.handleAlterSyncStateSet(ctx, request);
+            case CONTROLLER_ELECT_MASTER:
+                return this.handleControllerElectMaster(ctx, request);
+            case CONTROLLER_GET_REPLICA_INFO:
+                return this.handleControllerGetReplicaInfo(ctx, request);
+            case CONTROLLER_GET_METADATA_INFO:
+                return this.handleControllerGetMetadataInfo(ctx, request);
+            case BROKER_HEARTBEAT:
+                return this.handleBrokerHeartbeat(ctx, request);
+            case CONTROLLER_GET_SYNC_STATE_DATA:
+                return this.handleControllerGetSyncStateData(ctx, request);
             case UPDATE_CONTROLLER_CONFIG:
-                return this.updateControllerConfig(ctx, request);
+                return this.handleUpdateControllerConfig(ctx, request);
             case GET_CONTROLLER_CONFIG:
-                return this.getControllerConfig(ctx, request);
+                return this.handleGetControllerConfig(ctx, request);
             case CLEAN_BROKER_DATA:
-                final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
-                if (null != future) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
+                return this.handleCleanBrokerData(ctx, request);
+            case CONTROLLER_GET_NEXT_BROKER_ID:
+                return this.handleGetNextBrokerId(ctx, request);
+            case CONTROLLER_APPLY_BROKER_ID:
+                return this.handleApplyBrokerId(ctx, request);
+            case CONTROLLER_REGISTER_BROKER:
+                return this.handleRegisterBroker(ctx, request);
             default: {
                 final String error = " request type " + request.getCode() + " not supported";
                 return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
             }
         }
+    }
+
+    private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+        final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
         return RemotingCommand.createResponseCommand(null);
     }
 
-    @Override
-    public boolean rejectRequest() {
-        return false;
+    private RemotingCommand handleControllerElectMaster(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
+        if (future != null) {
+            final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+                    this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
+                }
+            }
+            return response;
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+                                                           RemotingCommand request) throws Exception {
+        final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand request) {
+        return this.controllerManager.getController().getControllerMetadata();
+    }
+
+    private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+        if (requestHeader.getBrokerId() == null) {
+            return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId");
+        }
+        this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
+                requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+        return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
+    }
+
+    private RemotingCommand handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+                                                             RemotingCommand request) throws Exception {
+        if (request.getBody() != null) {
+            final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
+            if (brokerNames != null && brokerNames.size() > 0) {
+                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+            }
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
+        if (null != future) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
     }
 
-    private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+    private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final GetNextBrokerIdRequestHeader requestHeader = (GetNextBrokerIdRequestHeader) request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getNextBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final ApplyBrokerIdRequestHeader requestHeader = (ApplyBrokerIdRequestHeader) request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().applyBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleRegisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        RegisterBrokerToControllerRequestHeader requestHeader = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }

Review Comment:
   > 注册成功是否也能当作一次心跳,就和之前的一样
   
   原本是这么想的,但是我发现需要加的心跳参数太多了,而且这个接口也只有注册流程中会用到(注册流程我们也会通过手动触发心跳来保证当前节点的活跃)。注册成功后,心跳定时任务就启动了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin merged pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin merged PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#issuecomment-1456378551

   ## 升级方案
   5.0以下版本升级则遵守之前的升级步骤。
   5.0以上版本升级需要遵守以下步骤。
   ### 升级controller
   由于我们在controller里面更改了许多状态机的内部属性以及新版的brokerId分配的逻辑,因此controller需要进行升级。
   
   1. 将旧版本Controller下线。
   2. 清除Controller数据,也就是DLedger的数据文件,默认路径在`~/DLedgerController`
   3. 上线新版本Controller。
   ### 升级Broker
   
   1. 将Broker从节点停机。
   2. 将Broker主节点停机。
   3. 将所有Broker的Epoch文件删除,即默认`~/store/epochFileCheckpoint`和`~/store/epochFileCheckpoint.bak`。
   4. 将原来的主Broker先上线,等待该Broker成功当选为master。(可用admin命令`getSyncState`来检测)
   5. 将原来的从Broker全部上线。
   ### 测试
   > 启动一个namesrv
   
   ```bash
   nohup sh bin/mqnamesrv &
   ```
   > 启动一个旧版controller
   
   ```bash
   nohup sh bin/mqcontroller -c ./conf/controller/controller-standalone.conf &
   ```
   > 查看controller是否被正确启动
   
   ```bash
   sh bin/mqadmin getControllerMetaData -a localhost:9878
   ```
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678012810122-7f5d01a0-1eb3-466e-a7b8-ef6ee97e356a.png#averageHue=%234e676d&clientId=ud9575b57-da39-4&from=paste&height=96&id=u850cf54b&name=image.png&originHeight=192&originWidth=1302&originalType=binary&ratio=2&rotation=0&showTitle=false&size=43942&status=done&style=none&taskId=u73d385e5-3e9e-4ebc-963e-032646b4270&title=&width=651)
   > 先后启动旧版broker0和broker1
   
   ```bash
   nohup sh bin/mqbroker -c conf/controller/quick-start/broker-n0.conf &
   ```
   ```bash
   nohup sh bin/mqbroker -c conf/controller/quick-start/broker-n1.conf &
   ```
   > 查看集群情况
   
   ```bash
   sh bin/mqadmin getSyncStateSet -a localhost:9878 -b broker-a
   ```
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013025645-b6fda039-52dc-4b23-a34a-87b58855941e.png#averageHue=%235c747a&clientId=ud9575b57-da39-4&from=paste&height=179&id=u1ec434c0&name=image.png&originHeight=358&originWidth=1428&originalType=binary&ratio=2&rotation=0&showTitle=false&size=100913&status=done&style=none&taskId=uf6011a80-3557-41e9-b4dc-96f0e7d003c&title=&width=714)
   > 发送消息
   
   ```bash
   sh bin/mqadmin sendMessage -p "hello" -n localhost:9876 -b broker-a -t default
   ```
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013133315-1e0cbce6-0847-45e4-8c72-2bb5a3630208.png#averageHue=%23566f75&clientId=ud9575b57-da39-4&from=paste&height=49&id=u90e9c08c&name=image.png&originHeight=98&originWidth=1726&originalType=binary&ratio=2&rotation=0&showTitle=false&size=44828&status=done&style=none&taskId=u861d9c17-bbe3-46fb-98a9-7ce005f3940&title=&width=863)
   > 检查两个节点是否正确append
   
   ```bash
   sh bin/mqadmin getBrokerEpoch -n localhost:9876 -b broker-a
   ```
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013175260-70a46313-9cf9-4fcf-b151-ef220cb5253e.png#averageHue=%2350696f&clientId=ud9575b57-da39-4&from=paste&height=215&id=u42ac8d2a&name=image.png&originHeight=430&originWidth=1420&originalType=binary&ratio=2&rotation=0&showTitle=false&size=97189&status=done&style=none&taskId=u922aeb68-ccc8-4824-8d3c-7b179030211&title=&width=710)
   > controller下线
   
   将controller进程杀死。
   测试此时是否可以正常收发消息。
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013357365-98898493-1c03-461e-8c25-f7e93d888cf8.png#averageHue=%235e767c&clientId=ud9575b57-da39-4&from=paste&height=497&id=ub885239d&name=image.png&originHeight=994&originWidth=1816&originalType=binary&ratio=2&rotation=0&showTitle=false&size=459876&status=done&style=none&taskId=u583866f7-4a5e-44ce-bcd5-feb8c406d06&title=&width=908)
   > 清除controller数据文件
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013416586-affba0ec-0d14-4bf8-ae54-333947148872.png#averageHue=%23315358&clientId=ud9575b57-da39-4&from=paste&height=33&id=u80a5c297&name=image.png&originHeight=66&originWidth=612&originalType=binary&ratio=2&rotation=0&showTitle=false&size=13044&status=done&style=none&taskId=u6e2226c8-9fb9-4de3-a380-bb773fd2f44&title=&width=306)
   > 上线新版controller
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013472886-9f2c9abc-90a5-4eca-90e4-eaeb43f64ff2.png#averageHue=%23556e76&clientId=ud9575b57-da39-4&from=paste&height=48&id=u15e42f28&name=image.png&originHeight=96&originWidth=1672&originalType=binary&ratio=2&rotation=0&showTitle=false&size=25642&status=done&style=none&taskId=u3ab8189a-3251-48a8-b6f5-f82c11f7255&title=&width=836)
   > 检查当前是否正常收发消息
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013562958-42c95028-940e-492a-a5fc-18da5bca7ec0.png#averageHue=%235e767c&clientId=ud9575b57-da39-4&from=paste&height=506&id=u5c1f6336&name=image.png&originHeight=1012&originWidth=1812&originalType=binary&ratio=2&rotation=0&showTitle=false&size=477314&status=done&style=none&taskId=u2ea78422-6f17-4e08-ab19-045bd100fc2&title=&width=906)
   > 分别将broker的从节点和主节点先后停机
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013669176-c0543c93-6134-41af-af22-50b04f275e3c.png#averageHue=%23536d73&clientId=ud9575b57-da39-4&from=paste&height=255&id=u5720825d&name=image.png&originHeight=510&originWidth=1554&originalType=binary&ratio=2&rotation=0&showTitle=false&size=187565&status=done&style=none&taskId=uae465fbf-e76d-4c18-aebc-c117970f94a&title=&width=777)
   > 清除每个broker的epoch文件
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678013770831-807c032c-5ca6-4caa-9c4b-2b3ddb4a65fb.png#averageHue=%23425b61&clientId=ud9575b57-da39-4&from=paste&height=196&id=uf4803db9&name=image.png&originHeight=392&originWidth=970&originalType=binary&ratio=2&rotation=0&showTitle=false&size=98429&status=done&style=none&taskId=u60979364-d48d-4456-814d-503acfe1a6e&title=&width=485)
   > 将原来的主broker先更新上线
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678014506505-9a956c74-5570-4f81-a6f2-be61559e8553.png#averageHue=%2308242c&clientId=ud9575b57-da39-4&from=paste&height=305&id=uf1804c38&name=image.png&originHeight=610&originWidth=1670&originalType=binary&ratio=2&rotation=0&showTitle=false&size=216539&status=done&style=none&taskId=uaadd396e-f9d2-42c3-b452-3d363d078f9&title=&width=835)
   > 从broker更新上线
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678017910996-5de2766f-fa0d-4409-90ae-68353bafbb15.png#averageHue=%230b262e&clientId=ud9575b57-da39-4&from=paste&height=424&id=ube1f28f4&name=image.png&originHeight=848&originWidth=1682&originalType=binary&ratio=2&rotation=0&showTitle=false&size=317231&status=done&style=none&taskId=u27d94d6f-3598-4b23-bebb-027e967b25f&title=&width=841)
   > 测试收发消息
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678018071178-e1d93327-478c-4ec4-bb6c-e430e6372e02.png#averageHue=%23173139&clientId=ud9575b57-da39-4&from=paste&height=528&id=u6ad3be5c&name=image.png&originHeight=1056&originWidth=1776&originalType=binary&ratio=2&rotation=0&showTitle=false&size=610731&status=done&style=none&taskId=ub7b2f581-7382-415d-808b-caaf4f21580&title=&width=888)
   > 上线一个新节点broker2
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678018461655-f5989043-01fe-4c12-8559-9ea5fe83077e.png#averageHue=%230b282f&clientId=ud9575b57-da39-4&from=paste&height=518&id=u7a6f7c91&name=image.png&originHeight=1036&originWidth=1690&originalType=binary&ratio=2&rotation=0&showTitle=false&size=358186&status=done&style=none&taskId=u71e144c6-4227-49b7-8fc4-9d4cd4cd558&title=&width=845)
   > 下线broker0,触发切主
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678018568780-ec9dd81f-8689-4d22-876e-c6c7b3a2beb8.png#averageHue=%23122d34&clientId=ud9575b57-da39-4&from=paste&height=470&id=u99925140&name=image.png&originHeight=940&originWidth=1808&originalType=binary&ratio=2&rotation=0&showTitle=false&size=407429&status=done&style=none&taskId=uac5e52c5-ae22-4bdf-80a1-bbff199e974&title=&width=904)
   broker1成为master
   > 下线broker1,触发切主
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678018645750-f744e4a8-d46b-436c-9894-cc7c7e051a85.png#averageHue=%23566f75&clientId=ud9575b57-da39-4&from=paste&height=278&id=u416ead59&name=image.png&originHeight=556&originWidth=1808&originalType=binary&ratio=2&rotation=0&showTitle=false&size=155092&status=done&style=none&taskId=u62d7a756-35f0-4b8f-b32e-c3fd7c98bca&title=&width=904)
   > 测试消息收发
   
   ![image.png](https://cdn.nlark.com/yuque/0/2023/png/22446956/1678018786653-ce8b93f5-56e0-4003-b87c-427f05ea4d5e.png#averageHue=%23627a80&clientId=ud9575b57-da39-4&from=paste&height=532&id=u5fb5b97f&name=image.png&originHeight=1064&originWidth=2034&originalType=binary&ratio=2&rotation=0&showTitle=false&size=482841&status=done&style=none&taskId=ud1cccc3a-7a9c-4dda-923f-f156a282adc&title=&width=1017)
   > 重启broker0和broker1
   
   <img width="806" alt="image" src="https://user-images.githubusercontent.com/87409330/223159422-142bc372-be99-4440-95ba-9bf222bffd6c.png">
   
   ### 兼容性
   
   |  | 5.0旧Controller | 新Controller |
   | --- | --- | --- |
   | 5.0旧Broker | 正常运行,可切换 | 若已主备确定则可正常运行,不可切换。若broker重新启动则无法上线 |
   | 新Broker | 无法正常上线 | 正常运行,可切换 |
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1127201779


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -148,8 +178,24 @@ private boolean startBasicService() {
         }
 
         if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
-            if (registerBrokerToController()) {
-                LOGGER.info("First time register broker success");
+            for (int retryTimes = 0; retryTimes < 5; retryTimes++) {
+                if (register()) {
+                    LOGGER.info("First time register broker success");
+                    this.state = State.REGISTER_TO_CONTROLLER_DONE;
+                    break;
+                }
+            }
+            // register 5 times but still unsuccessful
+            if (this.state != State.REGISTER_TO_CONTROLLER_DONE) {
+                return false;
+            }
+        }
+
+        if (this.state == State.REGISTER_TO_CONTROLLER_DONE) {
+            // The scheduled task for heartbeat sending is not starting now, so we should manually send heartbeat request
+            this.sendHeartbeatToController();
+            if (this.masterBrokerId != null || brokerElect()) {
+                LOGGER.info("Master in this broker set is elected");

Review Comment:
   这里存在一个并发问题。如果节点第一次启动失败(比如没有master在brokerSet中),过程中选举又成功了,通知节点变为Slave(走的notifyBrokerRoleChange)。节点第二次启动的时候这里直接跳过brokerElect,导致isIsolated一直为true,Slave节点无法正常上线。
   ![image](https://user-images.githubusercontent.com/21963954/223287056-a9a8500b-c93a-438f-b611-80a45bebed75.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1126054850


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -296,61 +347,241 @@ private void handleSlaveSynchronize(final BrokerRole role) {
         }
     }
 
-    private boolean registerBrokerToController() {
-        // Register this broker to controller, get brokerId and masterAddress.
+    private boolean brokerElect() {
+        // Broker try to elect itself as a master in broker set.
         try {
-            final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
-                this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
-                this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
-            final String newMasterAddress = registerResponse.getMasterAddress();
-            if (StringUtils.isNoneEmpty(newMasterAddress)) {
-                if (StringUtils.equals(newMasterAddress, this.localAddress)) {
-                    changeToMaster(registerResponse.getMasterEpoch(), registerResponse.getSyncStateSetEpoch());
-                } else {
-                    changeToSlave(newMasterAddress, registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
-                }
-                // Set isolated to false, make broker can register to namesrv regularly
-                brokerController.setIsolated(false);
-            } else {
-                LOGGER.warn("No master in controller");
+            ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
+                this.brokerConfig.getBrokerName(), this.brokerControllerId);
+            final String masterAddress = tryElectResponse.getMasterAddress();
+            final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
+            if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
+                LOGGER.warn("Now no master in broker set");
                 return false;
             }
+
+            if (masterBrokerId.equals(this.brokerControllerId)) {
+                changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Failed to try elect", e);
+            return false;
+        }
+    }
+
+    public void sendHeartbeatToController() {
+        final List<String> controllerAddresses = this.getAvailableControllerAddresses();
+        for (String controllerAddress : controllerAddresses) {
+            if (StringUtils.isNotEmpty(controllerAddress)) {
+                this.brokerOuterAPI.sendHeartbeatToController(
+                        controllerAddress,
+                        this.brokerConfig.getBrokerClusterName(),
+                        this.brokerAddress,
+                        this.brokerConfig.getBrokerName(),
+                        this.brokerControllerId,
+                        this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                        this.brokerConfig.isInBrokerContainer(), this.getLastEpoch(),
+                        this.brokerController.getMessageStore().getMaxPhyOffset(),
+                        this.getConfirmOffset(),
+                        this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+                        this.brokerConfig.getBrokerElectionPriority()
+                );
+            }
+        }
+    }
+
+    /**
+     * Register broker to controller, and persist the metadata to file
+     * @return whether registering process succeeded
+     */
+    private boolean register() {
+        try {
+            // 1. confirm now registering state
+            confirmNowRegisteringState();
+            // 2. get next assigning brokerId, and create temp metadata file
+            if (this.registerState == RegisterState.INITIAL) {
+                Long nextBrokerId = getNextBrokerId();
+                if (nextBrokerId == null || !createTempMetadataFile(nextBrokerId)) {
+                    return false;
+                }
+                this.registerState = RegisterState.CREATE_TEMP_METADATA_FILE_DONE;
+            }
+            // 3. apply brokerId to controller, and create metadata file
+            if (this.registerState == RegisterState.CREATE_TEMP_METADATA_FILE_DONE) {
+                if (!applyBrokerId()) {
+                    // apply broker id failed, means that this brokerId has been used
+                    // delete temp metadata file
+                    this.tempBrokerMetadata.clear();
+                    // back to the first step
+                    this.registerState = RegisterState.INITIAL;
+                    return false;
+                }
+                if (!createMetadataFileAndDeleteTemp()) {
+                    return false;
+                }
+                this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+            }
+            // 4. register
+            if (this.registerState == RegisterState.CREATE_METADATA_FILE_DONE) {
+                if (!registerBrokerToController()) {
+                    return false;
+                }
+                this.registerState = RegisterState.REGISTERED;
+            }
             return true;
         } catch (final Exception e) {
             LOGGER.error("Failed to register broker to controller", e);
             return false;
         }
     }
 
+    /**
+     * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set
+     * @return next brokerId in this broker-set
+     */
+    private Long getNextBrokerId() {
+        try {
+            GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+            return nextBrokerIdResp.getNextBrokerId();
+        } catch (Exception e) {
+            LOGGER.error("fail to get next broker id from controller", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create temp metadata file in local file system, records the brokerId and registerCheckCode
+     * @param brokerId the brokerId that is expected to be assigned
+     * @return whether the temp meta file is created successfully
+     */
+
+    private boolean createTempMetadataFile(Long brokerId) {
+        // generate register check code, format like that: $ipAddress;$timestamp
+        String registerCheckCode = this.brokerAddress + ";" + System.currentTimeMillis();
+        try {
+            this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("update and persist temp broker metadata file failed", e);
+            this.tempBrokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send applyBrokerId request to controller
+     * @return whether controller has assigned this brokerId for this broker
+     */
+    private boolean applyBrokerId() {
+        try {
+            ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
+                    tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+            return true;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId());
+            return false;
+        }
+    }
+
+    /**
+     * Create metadata file and delete temp metadata file
+     * @return whether process success
+     */
+    private boolean createMetadataFileAndDeleteTemp() {
+        // create metadata file and delete temp metadata file
+        try {
+            this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
+            this.tempBrokerMetadata.clear();
+            this.brokerControllerId = this.brokerMetadata.getBrokerId();
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to create metadata file", e);
+            this.brokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send registerSuccess request to inform controller that now broker has been registered successfully and controller should update broker ipAddress if changed
+     * @return whether request success
+     */
+    private boolean registerBrokerToController() {
+        try {
+            RegisterBrokerToControllerResponseHeader response = this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress);
+            final Long masterBrokerId = response.getMasterBrokerId();
+            final String masterAddress = response.getMasterAddress();
+            if (masterBrokerId == null) {
+                return true;
+            }
+            if (this.brokerControllerId.equals(masterBrokerId)) {
+                changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId);
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to send registerSuccess request to controller", e);
+            return false;
+        }
+    }
+
+    /**
+     * Confirm the registering state now
+     */
+    private void confirmNowRegisteringState() {
+        // 1. check if metadata exist
+        try {
+            this.brokerMetadata.readFromFile();
+        } catch (Exception e) {
+            LOGGER.error("read metadata file failed", e);
+        }
+        if (this.brokerMetadata.isLoaded()) {
+            this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+            this.brokerControllerId = brokerMetadata.getBrokerId();
+            return;
+        }
+        // 2. check if temp metadata exist
+        try {
+            this.tempBrokerMetadata.readFromFile();
+        } catch (Exception e) {
+            LOGGER.error("read temp metadata file failed", e);
+        }

Review Comment:
   如果brokerConfig中的brokerName与metadata文件中的brokerName不一致,建议日志告警出来(提问问题,如果是重建,建议删除xxx文件),然后退出。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1133179454


##########
docs/cn/controller/deploy.md:
##########
@@ -136,3 +136,29 @@ Broker若设置enableControllerMode=false,则仍然以之前方式运行。若
 (2)原DLedger模式升级到Controller切换架构
 
 由于原DLedger模式消息数据格式与Master-Slave下数据格式存在区别,不提供带数据原地升级的路径。在部署多组Broker的情况下,可以禁写某一组Broker一段时间(只要确认存量消息被全部消费即可,比如根据消息的保存时间来决定),然后清空store目录下除config/topics.json、subscriptionGroup.json下(保留topic和订阅关系的元数据)的其他文件后,进行空盘升级。
+
+### 持久化BrokerID版本的升级注意事项
+
+目前版本支持采用了新的持久化BrokerID版本的高可用架构,从该版本前的5.x升级到当前版本需要注意如下事项。
+
+4.x版本升级遵守上述流程即可。
+5.x非持久化BrokerID版本升级到持久化BrokerID版本按照如下流程:
+
+**升级Controller**
+
+1. 将旧版本Controller组停机。
+2. 清除Controller数据,即默认在`~/DLedgerController`下的数据文件。
+3. 上线新版Controller组。
+
+> 在上述升级Controller流程中,Broker仍可正常运行,但无法切换。
+
+**升级Broker**
+
+1. 将Broker从节点停机。
+2. 将Broker主节点停机。
+3. 将所有的Broker的Epoch文件删除,即默认为`~/store/epochFileCheckpoint`和`~/store/epochFileCheckpoint.bak`。
+4. 将原先的主Broker先上线,等待该Broker当选为master。(可使用`admin`命令的`getSyncStateSet`来观察)
+5. 将原来的从Broker全部上线。
+
+> 建议停机时先停从再停主,上线时先上原先的主再上原先的从,这样可以保证原来的主备关系。
+> 若需要改变升级前后主备关系,则需要停机时保证主、备的CommitLog对齐,否则可能导致数据被截断而丢失。

Review Comment:
   建议单独搞一个文档,介绍brokerId持久化方案的背景(解决的问题)、设计思想、兼容性升级方案,然后在部署文档中用链接引过去



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113850833


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -322,35 +438,150 @@ private boolean registerBrokerToController() {
         }
     }
 
+    /**
+     * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set
+     * @return next brokerId in this broker-set
+     */
+    private Long getNextBrokerId() {
+        try {
+            GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+            return nextBrokerIdResp.getNextBrokerId();
+        } catch (Exception e) {
+            LOGGER.error("fail to get next broker id from controller", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create temp metadata file in local file system, records the brokerId and registerCheckCode
+     * @param brokerId the brokerId that is expected to be assigned
+     * @return whether the temp meta file is created successfully
+     */
+
+    private boolean createTempMetadataFile(Long brokerId) {
+        // generate register check code, format like that: $ipAddress;$timestamp
+        String registerCheckCode = this.localAddress + ";" + System.currentTimeMillis();
+        try {
+            this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("update and persist temp broker metadata file failed", e);
+            this.tempBrokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send applyBrokerId request to controller
+     * @return whether controller has assigned this brokerId for this broker
+     */
+    private boolean applyBrokerId() {
+        try {
+            ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
+                    tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+            return true;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId());
+            return false;
+        }
+    }

Review Comment:
   > response whether to determine?
   
   `BrokerOuterAPI` will judge this response if successful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113846632


##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java:
##########
@@ -275,5 +275,11 @@ public class RequestCode {
      */
     public static final int CLEAN_BROKER_DATA = 1011;
 
+    public static final int GET_NEXT_BROKER_ID = 1012;
+
+    public static final int APPLY_BROKER_ID = 1013;
+
+    public static final int REGISTER_SUCCESS = 1014;

Review Comment:
   > 1.建议RequestCode前都增加一个CONTROLLER_的前缀
   > 
   > 2.REGISTER_SUCCESS能不能不新增,用之前的CONTROLLER_REGISTER_BROKER
   
   1. done~
   2. I will reuse `CONTROLLER_REGISTER_BROKER` as request code!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1125818318


##########
store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java:
##########
@@ -38,8 +38,13 @@ public class MessageStoreConfig {
 
     //The directory in which the epochFile is kept
     @ImportantField
-    private String storePathEpochFile = System.getProperty("user.home") + File.separator + "store"
-        + File.separator + "epochFileCheckpoint";
+    private String storePathEpochFile = null;
+
+    @ImportantField
+    private String storePathMetadata = null;
+
+    @ImportantField
+    private String storePathTempMetadata = null;

Review Comment:
   建议取消这个参数,这个参数更多是内部实现,不需要用户配置



##########
controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java:
##########
@@ -77,100 +81,140 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
                 request);
         }
         switch (request.getCode()) {
-            case CONTROLLER_ALTER_SYNC_STATE_SET: {
-                final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
-                final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_ELECT_MASTER: {
-                final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
-                if (future != null) {
-                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
-
-                    if (null != responseHeader) {
-                        if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
-                            this.controllerManager.notifyBrokerRoleChanged(responseHeader, electMasterRequest.getClusterName());
-                        }
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_REGISTER_BROKER: {
-                final RegisterBrokerToControllerRequestHeader controllerRequest = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(controllerRequest);
-                if (future != null) {
-                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
-                    if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
-                        this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                            responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
-                            controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_GET_REPLICA_INFO: {
-                final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_GET_METADATA_INFO: {
-                return this.controllerManager.getController().getControllerMetadata();
-            }
-            case BROKER_HEARTBEAT: {
-                final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-                this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
-                    requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
-                return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
-            }
-            case CONTROLLER_GET_SYNC_STATE_DATA: {
-                if (request.getBody() != null) {
-                    final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
-                    if (brokerNames != null && brokerNames.size() > 0) {
-                        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
-                        if (future != null) {
-                            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                        }
-                    }
-                }
-                break;
-            }
+            case CONTROLLER_ALTER_SYNC_STATE_SET:
+                return this.handleAlterSyncStateSet(ctx, request);
+            case CONTROLLER_ELECT_MASTER:
+                return this.handleControllerElectMaster(ctx, request);
+            case CONTROLLER_GET_REPLICA_INFO:
+                return this.handleControllerGetReplicaInfo(ctx, request);
+            case CONTROLLER_GET_METADATA_INFO:
+                return this.handleControllerGetMetadataInfo(ctx, request);
+            case BROKER_HEARTBEAT:
+                return this.handleBrokerHeartbeat(ctx, request);
+            case CONTROLLER_GET_SYNC_STATE_DATA:
+                return this.handleControllerGetSyncStateData(ctx, request);
             case UPDATE_CONTROLLER_CONFIG:
-                return this.updateControllerConfig(ctx, request);
+                return this.handleUpdateControllerConfig(ctx, request);
             case GET_CONTROLLER_CONFIG:
-                return this.getControllerConfig(ctx, request);
+                return this.handleGetControllerConfig(ctx, request);
             case CLEAN_BROKER_DATA:
-                final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
-                if (null != future) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
+                return this.handleCleanBrokerData(ctx, request);
+            case CONTROLLER_GET_NEXT_BROKER_ID:
+                return this.handleGetNextBrokerId(ctx, request);
+            case CONTROLLER_APPLY_BROKER_ID:
+                return this.handleApplyBrokerId(ctx, request);
+            case CONTROLLER_REGISTER_BROKER:
+                return this.handleRegisterBroker(ctx, request);
             default: {
                 final String error = " request type " + request.getCode() + " not supported";
                 return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
             }
         }
+    }
+
+    private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+        final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
         return RemotingCommand.createResponseCommand(null);
     }
 
-    @Override
-    public boolean rejectRequest() {
-        return false;
+    private RemotingCommand handleControllerElectMaster(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
+        if (future != null) {
+            final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+                    this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
+                }
+            }
+            return response;
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+                                                           RemotingCommand request) throws Exception {
+        final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand request) {
+        return this.controllerManager.getController().getControllerMetadata();
+    }
+
+    private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+        if (requestHeader.getBrokerId() == null) {
+            return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId");
+        }
+        this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
+                requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+        return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
+    }
+
+    private RemotingCommand handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+                                                             RemotingCommand request) throws Exception {
+        if (request.getBody() != null) {
+            final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
+            if (brokerNames != null && brokerNames.size() > 0) {
+                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+            }
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
+        if (null != future) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
     }
 
-    private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+    private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final GetNextBrokerIdRequestHeader requestHeader = (GetNextBrokerIdRequestHeader) request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getNextBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final ApplyBrokerIdRequestHeader requestHeader = (ApplyBrokerIdRequestHeader) request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().applyBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleRegisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        RegisterBrokerToControllerRequestHeader requestHeader = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }

Review Comment:
   注册成功是否也能当作一次心跳,就和之前的一样



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] mxsm commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "mxsm (via GitHub)" <gi...@apache.org>.
mxsm commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1112738599


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -108,10 +124,23 @@ public long getConfirmOffset() {
     enum State {
         INITIAL,
         FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+
+        FIRST_TIME_REGISTER_TO_CONTROLLER_DONE,
+
         RUNNING,
         SHUTDOWN,
     }

Review Comment:
   Could the enum name be changed to make it more semantically expressive



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/TempBrokerMetadata.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class TempBrokerMetadata extends MetadataFile {
+
+    private String clusterName;
+

Review Comment:
   How about  TempBrokerMetadata  extends BrokerMetadata 



##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -296,24 +347,89 @@ private void handleSlaveSynchronize(final BrokerRole role) {
         }
     }
 
+    private boolean brokerElect() {
+        // Broker try to elect itself as a master in broker set.
+        try {
+            ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
+                this.brokerConfig.getBrokerName(), this.brokerId);
+            final String masterAddress = tryElectResponse.getMasterAddress();
+            final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
+            if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
+                LOGGER.warn("Now no master in broker set");
+                return false;
+            }
+
+            if (masterBrokerId.equals(this.brokerId)) {
+                changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Failed to try elect", e);
+            return false;
+        }
+    }
+
+    public void sendHeartbeatToController() {
+        final List<String> controllerAddresses = this.getAvailableControllerAddresses();
+        for (String controllerAddress : controllerAddresses) {
+            if (StringUtils.isNotEmpty(controllerAddress)) {
+                this.brokerOuterAPI.sendHeartbeatToController(
+                        controllerAddress,
+                        this.brokerConfig.getBrokerClusterName(),
+                        this.localAddress,
+                        this.brokerConfig.getBrokerName(),
+                        this.brokerId,

Review Comment:
   How about `this.localAddress`  rename `this.brokerAddress`?



##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -322,35 +438,150 @@ private boolean registerBrokerToController() {
         }
     }
 
+    /**
+     * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set
+     * @return next brokerId in this broker-set
+     */
+    private Long getNextBrokerId() {
+        try {
+            GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+            return nextBrokerIdResp.getNextBrokerId();
+        } catch (Exception e) {
+            LOGGER.error("fail to get next broker id from controller", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create temp metadata file in local file system, records the brokerId and registerCheckCode
+     * @param brokerId the brokerId that is expected to be assigned
+     * @return whether the temp meta file is created successfully
+     */
+
+    private boolean createTempMetadataFile(Long brokerId) {
+        // generate register check code, format like that: $ipAddress;$timestamp
+        String registerCheckCode = this.localAddress + ";" + System.currentTimeMillis();
+        try {
+            this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("update and persist temp broker metadata file failed", e);
+            this.tempBrokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send applyBrokerId request to controller
+     * @return whether controller has assigned this brokerId for this broker
+     */
+    private boolean applyBrokerId() {
+        try {
+            ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
+                    tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+            return true;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId());
+            return false;
+        }
+    }

Review Comment:
   response whether to determine?



##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -322,35 +438,150 @@ private boolean registerBrokerToController() {
         }
     }
 
+    /**
+     * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set
+     * @return next brokerId in this broker-set
+     */
+    private Long getNextBrokerId() {
+        try {
+            GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+            return nextBrokerIdResp.getNextBrokerId();
+        } catch (Exception e) {
+            LOGGER.error("fail to get next broker id from controller", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create temp metadata file in local file system, records the brokerId and registerCheckCode
+     * @param brokerId the brokerId that is expected to be assigned
+     * @return whether the temp meta file is created successfully
+     */
+
+    private boolean createTempMetadataFile(Long brokerId) {
+        // generate register check code, format like that: $ipAddress;$timestamp
+        String registerCheckCode = this.localAddress + ";" + System.currentTimeMillis();
+        try {
+            this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("update and persist temp broker metadata file failed", e);
+            this.tempBrokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send applyBrokerId request to controller
+     * @return whether controller has assigned this brokerId for this broker
+     */
+    private boolean applyBrokerId() {
+        try {
+            ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
+                    tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+            return true;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId());
+            return false;
+        }
+    }
+
+    /**
+     * Create metadata file and delete temp metadata file
+     * @return whether process success
+     */
+    private boolean createMetadataFileAndDeleteTemp() {
+        // create metadata file and delete temp metadata file
+        try {
+            this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
+            this.tempBrokerMetadata.clear();
+            this.brokerId = this.brokerMetadata.getBrokerId();
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to create metadata file", e);
+            this.brokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send registerSuccess request to inform controller that now broker has been registered successfully and controller should update broker ipAddress if changed
+     * @return whether request success
+     */
+    private boolean registerSuccess() {
+        try {
+            RegisterSuccessResponseHeader response = this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, localAddress, controllerLeaderAddress);
+            final Long masterBrokerId = response.getMasterBrokerId();
+            final String masterAddress = response.getMasterAddress();
+            if (masterBrokerId == null) {
+                return true;
+            }
+            if (this.brokerId.equals(masterBrokerId)) {
+                changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId);
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to send registerSuccess request to controller", e);
+            return false;
+        }
+    }

Review Comment:
   registerSuccess method name、this.brokerOuterAPI.registerSuccess、 and request code  REGISTER_SUCCESS can rename. These names give me the understanding that the registration is successful
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#issuecomment-1439496310

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#6100](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (fb514f1) into [develop](https://codecov.io/gh/apache/rocketmq/commit/2dff070a774c0aa529738d9fffba590193453a47?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2dff070) will **decrease** coverage by `0.14%`.
   > The diff coverage is `38.06%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #6100      +/-   ##
   =============================================
   - Coverage      43.25%   43.11%   -0.14%     
   - Complexity      8862     8887      +25     
   =============================================
     Files           1094     1103       +9     
     Lines          77147    77534     +387     
     Branches       10063    10085      +22     
   =============================================
   + Hits           33369    33429      +60     
   - Misses         39616    39928     +312     
   - Partials        4162     4177      +15     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...a/org/apache/rocketmq/broker/BrokerController.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvQnJva2VyQ29udHJvbGxlci5qYXZh) | `43.72% <0.00%> (-2.85%)` | :arrow_down: |
   | [...ocketmq/broker/processor/AdminBrokerProcessor.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL0FkbWluQnJva2VyUHJvY2Vzc29yLmphdmE=) | `25.31% <0.00%> (ø)` | |
   | [...g/apache/rocketmq/client/impl/MQClientAPIImpl.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudEFQSUltcGwuamF2YQ==) | `23.00% <0.00%> (+0.03%)` | :arrow_up: |
   | [...ava/org/apache/rocketmq/common/BrokerAddrInfo.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vQnJva2VyQWRkckluZm8uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...c/main/java/org/apache/rocketmq/common/MixAll.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vTWl4QWxsLmphdmE=) | `41.44% <ø> (ø)` | |
   | [...org/apache/rocketmq/controller/BrokerLiveInfo.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvY29udHJvbGxlci9Ccm9rZXJMaXZlSW5mby5qYXZh) | `48.07% <0.00%> (-10.26%)` | :arrow_down: |
   | [...ocketmq/controller/impl/event/EventSerializer.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvY29udHJvbGxlci9pbXBsL2V2ZW50L0V2ZW50U2VyaWFsaXplci5qYXZh) | `66.66% <0.00%> (-2.30%)` | :arrow_down: |
   | [...ontroller/impl/event/UpdateBrokerAddressEvent.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvY29udHJvbGxlci9pbXBsL2V2ZW50L1VwZGF0ZUJyb2tlckFkZHJlc3NFdmVudC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...apache/rocketmq/remoting/protocol/RequestCode.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL3Byb3RvY29sL1JlcXVlc3RDb2RlLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...pache/rocketmq/remoting/protocol/ResponseCode.java](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL3Byb3RvY29sL1Jlc3BvbnNlQ29kZS5qYXZh) | `0.00% <ø> (ø)` | |
   | ... and [81 more](https://codecov.io/gh/apache/rocketmq/pull/6100?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113851343


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -77,10 +82,20 @@ public class ReplicasManager {
     private volatile String controllerLeaderAddress = "";
     private volatile State state = State.INITIAL;
 
+    private RegisterState registerState = RegisterState.INITIAL;
+
     private ScheduledFuture<?> checkSyncStateSetTaskFuture;
     private ScheduledFuture<?> slaveSyncFuture;
 
-    private Set<String> syncStateSet;
+    private Long brokerId;

Review Comment:
   > 
   
   Yep, I rename it!



##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -77,10 +82,20 @@ public class ReplicasManager {
     private volatile String controllerLeaderAddress = "";
     private volatile State state = State.INITIAL;
 
+    private RegisterState registerState = RegisterState.INITIAL;
+
     private ScheduledFuture<?> checkSyncStateSetTaskFuture;
     private ScheduledFuture<?> slaveSyncFuture;
 
-    private Set<String> syncStateSet;
+    private Long brokerId;

Review Comment:
   > 要不叫brokerControllerId,与brokerId做一个区分。
   
   Yep, I rename it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113845982


##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java:
##########
@@ -29,7 +29,7 @@ public class BrokerHeartbeatRequestHeader implements CommandCustomHeader {
     private String brokerAddr;
     @CFNotNull
     private String brokerName;
-    @CFNullable
+    @CFNotNull

Review Comment:
   > BrokerHeartbeatRequestHeader在controller中被用到,在Broker发给nameserver的轻量级心跳中也被用到(详见[RIP-32](https://github.com/apache/rocketmq/wiki/RIP-32-Slave-Acting-Master-Mode)),为了保持兼容性,建议仍然是CFNullable,在controller处对没有brokerId的进行拦截(提醒用户升级)
   
   done~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1132512232


##########
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java:
##########
@@ -351,35 +372,35 @@ public ControllerResult<Void> getSyncStateData(final List<String> brokerNames) {
     }
 
     public ControllerResult<Void> cleanBrokerData(final CleanControllerBrokerDataRequestHeader requestHeader,
-        final BiPredicate<String, String> brokerAlivePredicate) {
+                                                  final BrokerValidPredicate validPredicate) {
         final ControllerResult<Void> result = new ControllerResult<>();
 
         final String clusterName = requestHeader.getClusterName();
         final String brokerName = requestHeader.getBrokerName();
-        final String brokerAddrs = requestHeader.getBrokerAddress();
+        final String brokerIdSetToClean = requestHeader.getBrokerIdSetToClean();
 
-        Set<String> brokerAddressSet = null;
+        Set<Long> brokerIdSet = null;

Review Comment:
   命令中是brokerAddress,这里变成了brokerId,还是都改成brokerId吧



##########
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java:
##########
@@ -351,35 +372,35 @@ public ControllerResult<Void> getSyncStateData(final List<String> brokerNames) {
     }
 
     public ControllerResult<Void> cleanBrokerData(final CleanControllerBrokerDataRequestHeader requestHeader,
-        final BiPredicate<String, String> brokerAlivePredicate) {
+                                                  final BrokerValidPredicate validPredicate) {
         final ControllerResult<Void> result = new ControllerResult<>();
 
         final String clusterName = requestHeader.getClusterName();
         final String brokerName = requestHeader.getBrokerName();
-        final String brokerAddrs = requestHeader.getBrokerAddress();
+        final String brokerIdSetToClean = requestHeader.getBrokerIdSetToClean();
 
-        Set<String> brokerAddressSet = null;
+        Set<Long> brokerIdSet = null;
         if (!requestHeader.isCleanLivingBroker()) {
             //if SyncStateInfo.masterAddress is not empty, at least one broker with the same BrokerName is alive
             SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
-            if (StringUtils.isBlank(brokerAddrs) && null != syncStateInfo && StringUtils.isNotEmpty(syncStateInfo.getMasterAddress())) {
+            if (StringUtils.isBlank(brokerIdSetToClean) && null != syncStateInfo && syncStateInfo.getMasterBrokerId() != null) {
                 String remark = String.format("Broker %s is still alive, clean up failure", requestHeader.getBrokerName());
                 result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, remark);
                 return result;
             }
-            if (StringUtils.isNotBlank(brokerAddrs)) {
-                brokerAddressSet = Stream.of(brokerAddrs.split(";")).collect(Collectors.toSet());
-                for (String brokerAddr : brokerAddressSet) {
-                    if (brokerAlivePredicate.test(clusterName, brokerAddr)) {
-                        String remark = String.format("Broker [%s,  %s] is still alive, clean up failure", requestHeader.getBrokerName(), brokerAddr);
+            if (StringUtils.isNotBlank(brokerIdSetToClean)) {
+                brokerIdSet = Stream.of(brokerIdSetToClean.split(";")).map(idStr -> Long.valueOf(idStr)).collect(Collectors.toSet());

Review Comment:
   这里如果都是brokerId,是否要判断一下Long的格式是否符合?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1126754164


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -296,61 +347,241 @@ private void handleSlaveSynchronize(final BrokerRole role) {
         }
     }
 
-    private boolean registerBrokerToController() {
-        // Register this broker to controller, get brokerId and masterAddress.
+    private boolean brokerElect() {
+        // Broker try to elect itself as a master in broker set.
         try {
-            final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
-                this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
-                this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
-            final String newMasterAddress = registerResponse.getMasterAddress();
-            if (StringUtils.isNoneEmpty(newMasterAddress)) {
-                if (StringUtils.equals(newMasterAddress, this.localAddress)) {
-                    changeToMaster(registerResponse.getMasterEpoch(), registerResponse.getSyncStateSetEpoch());
-                } else {
-                    changeToSlave(newMasterAddress, registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
-                }
-                // Set isolated to false, make broker can register to namesrv regularly
-                brokerController.setIsolated(false);
-            } else {
-                LOGGER.warn("No master in controller");
+            ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
+                this.brokerConfig.getBrokerName(), this.brokerControllerId);
+            final String masterAddress = tryElectResponse.getMasterAddress();
+            final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
+            if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
+                LOGGER.warn("Now no master in broker set");
                 return false;
             }
+
+            if (masterBrokerId.equals(this.brokerControllerId)) {
+                changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Failed to try elect", e);
+            return false;
+        }
+    }
+
+    public void sendHeartbeatToController() {
+        final List<String> controllerAddresses = this.getAvailableControllerAddresses();
+        for (String controllerAddress : controllerAddresses) {
+            if (StringUtils.isNotEmpty(controllerAddress)) {
+                this.brokerOuterAPI.sendHeartbeatToController(
+                        controllerAddress,
+                        this.brokerConfig.getBrokerClusterName(),
+                        this.brokerAddress,
+                        this.brokerConfig.getBrokerName(),
+                        this.brokerControllerId,
+                        this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                        this.brokerConfig.isInBrokerContainer(), this.getLastEpoch(),
+                        this.brokerController.getMessageStore().getMaxPhyOffset(),
+                        this.getConfirmOffset(),
+                        this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+                        this.brokerConfig.getBrokerElectionPriority()
+                );
+            }
+        }
+    }
+
+    /**
+     * Register broker to controller, and persist the metadata to file
+     * @return whether registering process succeeded
+     */
+    private boolean register() {
+        try {
+            // 1. confirm now registering state
+            confirmNowRegisteringState();
+            // 2. get next assigning brokerId, and create temp metadata file
+            if (this.registerState == RegisterState.INITIAL) {
+                Long nextBrokerId = getNextBrokerId();
+                if (nextBrokerId == null || !createTempMetadataFile(nextBrokerId)) {
+                    return false;
+                }
+                this.registerState = RegisterState.CREATE_TEMP_METADATA_FILE_DONE;
+            }
+            // 3. apply brokerId to controller, and create metadata file
+            if (this.registerState == RegisterState.CREATE_TEMP_METADATA_FILE_DONE) {
+                if (!applyBrokerId()) {
+                    // apply broker id failed, means that this brokerId has been used
+                    // delete temp metadata file
+                    this.tempBrokerMetadata.clear();
+                    // back to the first step
+                    this.registerState = RegisterState.INITIAL;
+                    return false;
+                }
+                if (!createMetadataFileAndDeleteTemp()) {
+                    return false;
+                }
+                this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+            }
+            // 4. register
+            if (this.registerState == RegisterState.CREATE_METADATA_FILE_DONE) {
+                if (!registerBrokerToController()) {
+                    return false;
+                }
+                this.registerState = RegisterState.REGISTERED;
+            }
             return true;
         } catch (final Exception e) {
             LOGGER.error("Failed to register broker to controller", e);
             return false;
         }
     }
 
+    /**
+     * Send GetNextBrokerRequest to controller for getting next assigning brokerId in this broker-set
+     * @return next brokerId in this broker-set
+     */
+    private Long getNextBrokerId() {
+        try {
+            GetNextBrokerIdResponseHeader nextBrokerIdResp = this.brokerOuterAPI.getNextBrokerId(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.controllerLeaderAddress);
+            return nextBrokerIdResp.getNextBrokerId();
+        } catch (Exception e) {
+            LOGGER.error("fail to get next broker id from controller", e);
+            return null;
+        }
+    }
+
+    /**
+     * Create temp metadata file in local file system, records the brokerId and registerCheckCode
+     * @param brokerId the brokerId that is expected to be assigned
+     * @return whether the temp meta file is created successfully
+     */
+
+    private boolean createTempMetadataFile(Long brokerId) {
+        // generate register check code, format like that: $ipAddress;$timestamp
+        String registerCheckCode = this.brokerAddress + ";" + System.currentTimeMillis();
+        try {
+            this.tempBrokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerId, registerCheckCode);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("update and persist temp broker metadata file failed", e);
+            this.tempBrokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send applyBrokerId request to controller
+     * @return whether controller has assigned this brokerId for this broker
+     */
+    private boolean applyBrokerId() {
+        try {
+            ApplyBrokerIdResponseHeader response = this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
+                    tempBrokerMetadata.getBrokerId(), tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+            return true;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId());
+            return false;
+        }
+    }
+
+    /**
+     * Create metadata file and delete temp metadata file
+     * @return whether process success
+     */
+    private boolean createMetadataFileAndDeleteTemp() {
+        // create metadata file and delete temp metadata file
+        try {
+            this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
+            this.tempBrokerMetadata.clear();
+            this.brokerControllerId = this.brokerMetadata.getBrokerId();
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to create metadata file", e);
+            this.brokerMetadata.clear();
+            return false;
+        }
+    }
+
+    /**
+     * Send registerSuccess request to inform controller that now broker has been registered successfully and controller should update broker ipAddress if changed
+     * @return whether request success
+     */
+    private boolean registerBrokerToController() {
+        try {
+            RegisterBrokerToControllerResponseHeader response = this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress);
+            final Long masterBrokerId = response.getMasterBrokerId();
+            final String masterAddress = response.getMasterAddress();
+            if (masterBrokerId == null) {
+                return true;
+            }
+            if (this.brokerControllerId.equals(masterBrokerId)) {
+                changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId);
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("fail to send registerSuccess request to controller", e);
+            return false;
+        }
+    }
+
+    /**
+     * Confirm the registering state now
+     */
+    private void confirmNowRegisteringState() {
+        // 1. check if metadata exist
+        try {
+            this.brokerMetadata.readFromFile();
+        } catch (Exception e) {
+            LOGGER.error("read metadata file failed", e);
+        }
+        if (this.brokerMetadata.isLoaded()) {
+            this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
+            this.brokerControllerId = brokerMetadata.getBrokerId();
+            return;
+        }
+        // 2. check if temp metadata exist
+        try {
+            this.tempBrokerMetadata.readFromFile();
+        } catch (Exception e) {
+            LOGGER.error("read temp metadata file failed", e);
+        }

Review Comment:
   > 如果brokerConfig中的brokerName与metadata文件中的brokerName不一致,建议日志告警出来(提问问题,如果是重建,建议删除xxx文件),然后退出。
   
   done~



##########
store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java:
##########
@@ -38,8 +38,13 @@ public class MessageStoreConfig {
 
     //The directory in which the epochFile is kept
     @ImportantField
-    private String storePathEpochFile = System.getProperty("user.home") + File.separator + "store"
-        + File.separator + "epochFileCheckpoint";
+    private String storePathEpochFile = null;
+
+    @ImportantField
+    private String storePathMetadata = null;
+
+    @ImportantField
+    private String storePathTempMetadata = null;

Review Comment:
   > 建议取消这个参数,这个参数更多是内部实现,不需要用户配置
   
   done~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1127721267


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -148,8 +178,24 @@ private boolean startBasicService() {
         }
 
         if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
-            if (registerBrokerToController()) {
-                LOGGER.info("First time register broker success");
+            for (int retryTimes = 0; retryTimes < 5; retryTimes++) {
+                if (register()) {
+                    LOGGER.info("First time register broker success");
+                    this.state = State.REGISTER_TO_CONTROLLER_DONE;
+                    break;
+                }
+            }
+            // register 5 times but still unsuccessful
+            if (this.state != State.REGISTER_TO_CONTROLLER_DONE) {
+                return false;
+            }
+        }
+
+        if (this.state == State.REGISTER_TO_CONTROLLER_DONE) {
+            // The scheduled task for heartbeat sending is not starting now, so we should manually send heartbeat request
+            this.sendHeartbeatToController();
+            if (this.masterBrokerId != null || brokerElect()) {
+                LOGGER.info("Master in this broker set is elected");

Review Comment:
   > 
   
   done~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1130519579


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -148,9 +178,26 @@ private boolean startBasicService() {
         }
 
         if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
-            if (registerBrokerToController()) {
-                LOGGER.info("First time register broker success");
+            for (int retryTimes = 0; retryTimes < 5; retryTimes++) {
+                if (register()) {
+                    LOGGER.info("First time register broker success");
+                    this.state = State.REGISTER_TO_CONTROLLER_DONE;
+                    break;
+                }
+            }

Review Comment:
   这边是不是不需要连续重试,如果失败直接等待再次重跑startBasicService



##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -148,9 +178,26 @@ private boolean startBasicService() {
         }
 
         if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
-            if (registerBrokerToController()) {
-                LOGGER.info("First time register broker success");
+            for (int retryTimes = 0; retryTimes < 5; retryTimes++) {
+                if (register()) {
+                    LOGGER.info("First time register broker success");
+                    this.state = State.REGISTER_TO_CONTROLLER_DONE;
+                    break;
+                }
+            }

Review Comment:
   中间最好sleep一下



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113843544


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -296,24 +347,89 @@ private void handleSlaveSynchronize(final BrokerRole role) {
         }
     }
 
+    private boolean brokerElect() {
+        // Broker try to elect itself as a master in broker set.
+        try {
+            ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
+                this.brokerConfig.getBrokerName(), this.brokerId);
+            final String masterAddress = tryElectResponse.getMasterAddress();
+            final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
+            if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
+                LOGGER.warn("Now no master in broker set");
+                return false;
+            }
+
+            if (masterBrokerId.equals(this.brokerId)) {
+                changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Failed to try elect", e);
+            return false;
+        }
+    }
+
+    public void sendHeartbeatToController() {
+        final List<String> controllerAddresses = this.getAvailableControllerAddresses();
+        for (String controllerAddress : controllerAddresses) {
+            if (StringUtils.isNotEmpty(controllerAddress)) {
+                this.brokerOuterAPI.sendHeartbeatToController(
+                        controllerAddress,
+                        this.brokerConfig.getBrokerClusterName(),
+                        this.localAddress,
+                        this.brokerConfig.getBrokerName(),
+                        this.brokerId,

Review Comment:
   > How about `this.localAddress` rename `this.brokerAddress`?
   
   done~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1130513885


##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -148,9 +178,26 @@ private boolean startBasicService() {
         }
 
         if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
-            if (registerBrokerToController()) {
-                LOGGER.info("First time register broker success");
+            for (int retryTimes = 0; retryTimes < 5; retryTimes++) {
+                if (register()) {
+                    LOGGER.info("First time register broker success");
+                    this.state = State.REGISTER_TO_CONTROLLER_DONE;
+                    break;
+                }
+            }

Review Comment:
   中间最好sleep一下



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#issuecomment-1464825819

   ![image](https://user-images.githubusercontent.com/21963954/224465588-3c9e1bf7-b52e-43ce-96e4-92dcdb0fe1d6.png)
   The UT seems to be unstable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#issuecomment-1467213871

   Rebase and merge this PR because its commit log is clear and huge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] TheR1sing3un commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "TheR1sing3un (via GitHub)" <gi...@apache.org>.
TheR1sing3un commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1113839525


##########
tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java:
##########
@@ -45,7 +46,7 @@ public Options buildCommandlineOptions(Options options) {
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("b", "brokerAddress", true, "The address of the broker which requires to become master");

Review Comment:
   > 保持brokerAddress是不是更易使用?或者我们需要一个命令来获取所有的地址与brokerId的对应关系
   
   我觉得可以加一个命令获取对应关系,这样更规范一点,不需要在elemaster那里再更改一些逻辑了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1112738675


##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java:
##########
@@ -29,7 +29,7 @@ public class BrokerHeartbeatRequestHeader implements CommandCustomHeader {
     private String brokerAddr;
     @CFNotNull
     private String brokerName;
-    @CFNullable
+    @CFNotNull

Review Comment:
   BrokerHeartbeatRequestHeader在controller中被用到,在Broker发给nameserver的轻量级心跳中也被用到(详见[RIP-32](https://github.com/apache/rocketmq/wiki/RIP-32-Slave-Acting-Master-Mode)),为了保持兼容性,建议仍然是CFNullable,在controller处对没有brokerId的进行拦截(提醒用户升级)



##########
tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java:
##########
@@ -45,7 +46,7 @@ public Options buildCommandlineOptions(Options options) {
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("b", "brokerAddress", true, "The address of the broker which requires to become master");

Review Comment:
   保持brokerAddress是不是更易使用?或者我们需要一个命令来获取所有的地址与brokerId的对应关系



##########
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+
+/**
+ * Broker replicas info, mapping from brokerAddress to {brokerId, brokerHaAddress}.
+ */
+public class BrokerReplicaInfo {
+    private final String clusterName;
+
+    private final String brokerName;
+
+    // Start from 1
+    private final AtomicLong nextAssignBrokerId;
+
+    private final HashMap<Long/*brokerId*/, Pair<String/*ipAddress*/, String/*registerCheckCode*/>> brokerIdInfo;
+
+    public BrokerReplicaInfo(String clusterName, String brokerName) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.nextAssignBrokerId = new AtomicLong(MixAll.FIRST_SLAVE_ID);

Review Comment:
   这里用FIRST_SLAVE_ID语义不是很好,这是brokerControlllerId编号,跟slaveId没什么关系



##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java:
##########
@@ -275,5 +275,11 @@ public class RequestCode {
      */
     public static final int CLEAN_BROKER_DATA = 1011;
 
+    public static final int GET_NEXT_BROKER_ID = 1012;
+
+    public static final int APPLY_BROKER_ID = 1013;
+
+    public static final int REGISTER_SUCCESS = 1014;

Review Comment:
   1.建议RequestCode前都增加一个CONTROLLER_的前缀
   
   2.REGISTER_SUCCESS能不能不新增,用之前的CONTROLLER_REGISTER_BROKER



##########
broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java:
##########
@@ -77,10 +82,20 @@ public class ReplicasManager {
     private volatile String controllerLeaderAddress = "";
     private volatile State state = State.INITIAL;
 
+    private RegisterState registerState = RegisterState.INITIAL;
+
     private ScheduledFuture<?> checkSyncStateSetTaskFuture;
     private ScheduledFuture<?> slaveSyncFuture;
 
-    private Set<String> syncStateSet;
+    private Long brokerId;

Review Comment:
   要不叫brokerControllerId,与brokerId做一个区分。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#discussion_r1125893831


##########
controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java:
##########
@@ -77,100 +81,140 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
                 request);
         }
         switch (request.getCode()) {
-            case CONTROLLER_ALTER_SYNC_STATE_SET: {
-                final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
-                final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_ELECT_MASTER: {
-                final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
-                if (future != null) {
-                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
-
-                    if (null != responseHeader) {
-                        if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
-                            this.controllerManager.notifyBrokerRoleChanged(responseHeader, electMasterRequest.getClusterName());
-                        }
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_REGISTER_BROKER: {
-                final RegisterBrokerToControllerRequestHeader controllerRequest = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(controllerRequest);
-                if (future != null) {
-                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
-                    if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
-                        this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                            responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
-                            controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
-                    }
-                    return response;
-                }
-                break;
-            }
-            case CONTROLLER_GET_REPLICA_INFO: {
-                final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            }
-            case CONTROLLER_GET_METADATA_INFO: {
-                return this.controllerManager.getController().getControllerMetadata();
-            }
-            case BROKER_HEARTBEAT: {
-                final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-                this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
-                    requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
-                return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
-            }
-            case CONTROLLER_GET_SYNC_STATE_DATA: {
-                if (request.getBody() != null) {
-                    final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
-                    if (brokerNames != null && brokerNames.size() > 0) {
-                        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
-                        if (future != null) {
-                            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                        }
-                    }
-                }
-                break;
-            }
+            case CONTROLLER_ALTER_SYNC_STATE_SET:
+                return this.handleAlterSyncStateSet(ctx, request);
+            case CONTROLLER_ELECT_MASTER:
+                return this.handleControllerElectMaster(ctx, request);
+            case CONTROLLER_GET_REPLICA_INFO:
+                return this.handleControllerGetReplicaInfo(ctx, request);
+            case CONTROLLER_GET_METADATA_INFO:
+                return this.handleControllerGetMetadataInfo(ctx, request);
+            case BROKER_HEARTBEAT:
+                return this.handleBrokerHeartbeat(ctx, request);
+            case CONTROLLER_GET_SYNC_STATE_DATA:
+                return this.handleControllerGetSyncStateData(ctx, request);
             case UPDATE_CONTROLLER_CONFIG:
-                return this.updateControllerConfig(ctx, request);
+                return this.handleUpdateControllerConfig(ctx, request);
             case GET_CONTROLLER_CONFIG:
-                return this.getControllerConfig(ctx, request);
+                return this.handleGetControllerConfig(ctx, request);
             case CLEAN_BROKER_DATA:
-                final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
-                if (null != future) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
+                return this.handleCleanBrokerData(ctx, request);
+            case CONTROLLER_GET_NEXT_BROKER_ID:
+                return this.handleGetNextBrokerId(ctx, request);
+            case CONTROLLER_APPLY_BROKER_ID:
+                return this.handleApplyBrokerId(ctx, request);
+            case CONTROLLER_REGISTER_BROKER:
+                return this.handleRegisterBroker(ctx, request);
             default: {
                 final String error = " request type " + request.getCode() + " not supported";
                 return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
             }
         }
+    }
+
+    private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+        final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
         return RemotingCommand.createResponseCommand(null);
     }
 
-    @Override
-    public boolean rejectRequest() {
-        return false;
+    private RemotingCommand handleControllerElectMaster(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
+        if (future != null) {
+            final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+                    this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
+                }
+            }
+            return response;
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+                                                           RemotingCommand request) throws Exception {
+        final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand request) {
+        return this.controllerManager.getController().getControllerMetadata();
+    }
+
+    private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+        if (requestHeader.getBrokerId() == null) {
+            return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId");
+        }
+        this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
+                requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+        return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
+    }
+
+    private RemotingCommand handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+                                                             RemotingCommand request) throws Exception {
+        if (request.getBody() != null) {
+            final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
+            if (brokerNames != null && brokerNames.size() > 0) {
+                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+            }
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
+        if (null != future) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
     }
 
-    private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+    private RemotingCommand handleGetNextBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final GetNextBrokerIdRequestHeader requestHeader = (GetNextBrokerIdRequestHeader) request.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getNextBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleApplyBrokerId(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final ApplyBrokerIdRequestHeader requestHeader = (ApplyBrokerIdRequestHeader) request.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().applyBrokerId(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleRegisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        RegisterBrokerToControllerRequestHeader requestHeader = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+        CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(requestHeader);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }

Review Comment:
   > 
   
   好的



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on pull request #6100: [ISSUE #5989] Support unique broker-id as identification in controller mode

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on PR #6100:
URL: https://github.com/apache/rocketmq/pull/6100#issuecomment-1464859017

   https://github.com/apache/rocketmq/actions/runs/4390865509/jobs/7689623469
   ![image](https://user-images.githubusercontent.com/21963954/224473539-ffc87631-1b6a-4dfc-b7d5-13789dfaae5c.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org