You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/05/11 08:20:13 UTC

[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4272: [Summer of Code] Support switch role for broker

RongtongJin commented on code in PR #4272:
URL: https://github.com/apache/rocketmq/pull/4272#discussion_r869840414


##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/HaControllerProxy.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import static org.apache.rocketmq.common.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
+import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+
+/**
+ * The proxy of controller api.
+ */
+public class HaControllerProxy {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int RPC_TIME_OUT = 3000;
+    private final RemotingClient remotingClient;
+    private final List<String> controllerAddresses;
+    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ControllerProxy_"));
+    private volatile String controllerLeaderAddress = "";
+
+    public HaControllerProxy(final NettyClientConfig nettyClientConfig, final List<String> controllerAddresses) {
+        this.remotingClient = new NettyRemotingClient(nettyClientConfig);

Review Comment:
   建议API的具体实现还是放到brokerOutAPI,不要新建NettyRemotingClient



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ScheduledExecutorService syncMetadataService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_SyncMetadata_"));
+    private final ScheduledExecutorService checkSyncStateSetService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_CheckSyncStateSet_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final HaControllerProxy proxy;
+    private final String clusterName;
+    private final String brokerName;
+    private final String localAddress;
+    private final String localHaAddress;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private BrokerRole currentRole = BrokerRole.SLAVE;
+    private Long brokerId = -1L;

Review Comment:
   这两个为什么不直接用brokerConfig或messageStoreConfig配置里的?如果有多份,可能会有多余的一致性问题



##########
container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java:
##########
@@ -86,7 +86,7 @@ public void run2() {
             }
         }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
 
-        if (this.brokerConfig.isEnableSlaveActingMaster()) {
+        if (this.brokerConfig.isEnableSlaveActingMaster() || this.messageStoreConfig.isStartupControllerMode()) {

Review Comment:
   建议修改成和BrokerController那一样,不然会调用syncBrokerMemberGroup



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/HaControllerProxy.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import static org.apache.rocketmq.common.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
+import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+
+/**
+ * The proxy of controller api.
+ */
+public class HaControllerProxy {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    public static final int RPC_TIME_OUT = 3000;
+    private final RemotingClient remotingClient;
+    private final List<String> controllerAddresses;
+    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ControllerProxy_"));
+    private volatile String controllerLeaderAddress = "";
+
+    public HaControllerProxy(final NettyClientConfig nettyClientConfig, final List<String> controllerAddresses) {
+        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+        this.controllerAddresses = controllerAddresses;
+    }
+
+    public boolean start() {
+        this.remotingClient.start();
+        // Get controller metadata first.
+        int tryTimes = 0;
+        while (tryTimes < 3) {
+            boolean flag = updateControllerMetadata();
+            if (flag) {
+                this.executorService.scheduleAtFixedRate(this::updateControllerMetadata, 0, 2, TimeUnit.SECONDS);
+                return true;
+            }
+            tryTimes ++;
+        }
+        LOGGER.error("Failed to init controller metadata, maybe the controllers in {} is not available", this.controllerAddresses);
+        return false;
+    }
+
+    public void shutdown() {
+        this.remotingClient.shutdown();
+        this.executorService.shutdown();
+    }
+
+    /**
+     * Update controller metadata(leaderAddress)
+     */
+    private boolean updateControllerMetadata() {
+        for (final String address : this.controllerAddresses) {
+            final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_METADATA_INFO, null);
+            try {
+                final RemotingCommand response = this.remotingClient.invokeSync(address, request, RPC_TIME_OUT);
+                if (response.getCode() == SUCCESS) {
+                    final GetMetaDataResponseHeader responseHeader = response.decodeCommandCustomHeader(GetMetaDataResponseHeader.class);
+                    if (responseHeader != null && responseHeader.isLeader()) {
+                        // Because the controller is served externally with the help of name-srv
+                        this.controllerLeaderAddress = address;
+                        LOGGER.info("Change controller leader address to {}", this.controllerAddresses);
+                        return true;
+                    }
+                }
+            } catch (final Exception e) {
+                LOGGER.error("Error happen when pull controller metadata", e);
+                return false;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Alter syncStateSet
+     */
+    public SyncStateSet alterSyncStateSet(String brokerName,
+        final String masterAddress, final int masterEpoch,
+        final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws Exception {
+
+        final AlterSyncStateSetRequestHeader requestHeader = new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch);
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, requestHeader);
+        request.setBody(new SyncStateSet(newSyncStateSet, syncStateSetEpoch).encode());
+        final RemotingCommand response = this.remotingClient.invokeSync(this.controllerLeaderAddress, request, RPC_TIME_OUT);
+        assert response != null;
+        switch (response.getCode()) {
+            case SUCCESS: {
+                assert response.getBody() != null;
+                return RemotingSerializable.decode(response.getBody(), SyncStateSet.class);
+            }
+            case CONTROLLER_NOT_LEADER: {
+                throw new MQBrokerException(response.getCode(), "Controller leader was changed");
+            }
+        }

Review Comment:
   建议增加更多的response code case,下面也一样



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ScheduledExecutorService syncMetadataService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_SyncMetadata_"));
+    private final ScheduledExecutorService checkSyncStateSetService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_CheckSyncStateSet_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final HaControllerProxy proxy;
+    private final String clusterName;
+    private final String brokerName;
+    private final String localAddress;
+    private final String localHaAddress;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private BrokerRole currentRole = BrokerRole.SLAVE;
+    private Long brokerId = -1L;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController, final MessageStore messageStore) {

Review Comment:
   brokerController里可以拿到MessageStore,所以入参只需要brokerControlle即可



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerInfo.java:
##########
@@ -17,23 +17,26 @@
 package org.apache.rocketmq.namesrv.controller.manager;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.Pair;
 
 /**
- * Broker id info, mapping from brokerAddress to brokerId.
+ * Broker info, mapping from brokerAddress to {brokerId, brokerHaAddress}.
  */
-public class BrokerIdInfo {
+public class BrokerInfo {
     private final String clusterName;
     private final String brokerName;
     // Start from 1
     private final AtomicLong brokerIdCount;
-    private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable;
+    private final HashMap<String/*Address*/, Pair<Long/*brokerId*/, String/*HaAddress*/>> brokerTable;

Review Comment:
   不需要返回HaAddress,以NameServer注册为准



##########
broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.hacontroller;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+/**
+ * The manager of broker replicas, including:
+ * 1.regularly syncing metadata from controllers, and changing broker roles and master if needed, both master and slave will start this timed task.
+ * 2.regularly expanding and Shrinking syncStateSet, only master will start this timed task.
+ */
+public class ReplicasManager {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ScheduledExecutorService syncMetadataService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_SyncMetadata_"));
+    private final ScheduledExecutorService checkSyncStateSetService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ReplicasManager_CheckSyncStateSet_"));
+    private final BrokerController brokerController;
+    private final AutoSwitchHAService haService;
+    private final HaControllerProxy proxy;
+    private final String clusterName;
+    private final String brokerName;
+    private final String localAddress;
+    private final String localHaAddress;
+
+    private ScheduledFuture<?> checkSyncStateSetTaskFuture;
+    private ScheduledFuture<?> slaveSyncFuture;
+
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch = 0;
+    private BrokerRole currentRole = BrokerRole.SLAVE;
+    private Long brokerId = -1L;
+    private String masterAddress = "";
+    private int masterEpoch = 0;
+
+    public ReplicasManager(final BrokerController brokerController, final MessageStore messageStore) {
+        this.brokerController = brokerController;
+        this.haService = (AutoSwitchHAService) messageStore.getHaService();
+        final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        final String controllerPaths = brokerConfig.getNamesrvAddr();
+        final String[] controllers = controllerPaths.split(";");
+        assert controllers.length > 0;
+        this.proxy = new HaControllerProxy(brokerController.getNettyClientConfig(), Arrays.asList(controllers));
+        this.syncStateSet = new HashSet<>();
+        this.clusterName = brokerConfig.getBrokerClusterName();
+        this.brokerName = brokerConfig.getBrokerName();
+        this.localAddress = brokerController.getBrokerAddr();
+        this.localHaAddress = brokerController.getHAServerAddr();
+        this.haService.setLocalAddress(this.localAddress);
+    }
+
+    public boolean start() {

Review Comment:
   返回boolean好像并没有用



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org