You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/28 02:08:57 UTC

[rocketmq] branch develop updated: GetSyncStateSetSubCommand can also print that the broker is not in syncStateSet (#5935)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0ea930182 GetSyncStateSetSubCommand can also print that the broker is not in syncStateSet (#5935)
0ea930182 is described below

commit 0ea930182c60f2dbb2bf036223d9bd941fcd2f3b
Author: rongtong <ji...@163.com>
AuthorDate: Sat Jan 28 10:08:51 2023 +0800

    GetSyncStateSetSubCommand can also print that the broker is not in syncStateSet (#5935)
---
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  6 +--
 .../impl/manager/ReplicasInfoManager.java          | 26 ++++++----
 ...nSyncStateData.java => BrokerReplicasInfo.java} | 60 +++++++++++++---------
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  4 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |  4 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  4 +-
 .../command/ha/GetSyncStateSetSubCommand.java      | 23 +++++----
 7 files changed, 75 insertions(+), 52 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f251e2b00..8347f3653 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -109,7 +109,7 @@ import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
@@ -2982,7 +2982,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public InSyncStateData getInSyncStateData(final String controllerAddress,
+    public BrokerReplicasInfo getInSyncStateData(final String controllerAddress,
         final List<String> brokers) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingCommandException {
         // Get controller leader address.
         final GetMetaDataResponseHeader controllerMetaData = getControllerMetaData(controllerAddress);
@@ -2997,7 +2997,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
-                return RemotingSerializable.decode(response.getBody(), InSyncStateData.class);
+                return RemotingSerializable.decode(response.getBody(), BrokerReplicasInfo.class);
             }
             default:
                 break;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index a820b069e..1c5a805b6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -41,7 +41,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
@@ -264,7 +264,7 @@ public class ReplicasInfoManager {
                     canBeElectedAsMaster = syncStateInfo.getSyncStateSet().contains(brokerAddress) || this.controllerConfig.isEnableElectUncleanMaster();
                 }
                 if (!canBeElectedAsMaster) {
-                     // still need to apply an ElectMasterEvent to tell the statemachine
+                    // still need to apply an ElectMasterEvent to tell the statemachine
                     // that the master was shutdown and no new master was elected. set SyncStateInfo.masterAddress empty
                     final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
                     result.addEvent(event);
@@ -322,7 +322,7 @@ public class ReplicasInfoManager {
 
     public ControllerResult<Void> getSyncStateData(final List<String> brokerNames) {
         final ControllerResult<Void> result = new ControllerResult<>();
-        final InSyncStateData inSyncStateData = new InSyncStateData();
+        final BrokerReplicasInfo brokerReplicasInfo = new BrokerReplicasInfo();
         for (String brokerName : brokerNames) {
             if (isContainsBroker(brokerName)) {
                 // If exist broker metadata, just return metadata
@@ -330,17 +330,23 @@ public class ReplicasInfoManager {
                 final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
                 final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
                 final String master = syncStateInfo.getMasterAddress();
-                final ArrayList<InSyncStateData.InSyncMember> inSyncMembers = new ArrayList<>();
-                syncStateSet.forEach(replicas -> {
-                    long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(replicas);
-                    inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerId));
+                final ArrayList<BrokerReplicasInfo.ReplicaIdentity> inSyncReplicas = new ArrayList<>();
+                final ArrayList<BrokerReplicasInfo.ReplicaIdentity> notInSyncReplicas = new ArrayList<>();
+
+                brokerInfo.getBrokerIdTable().forEach((brokerAddress, brokerId) -> {
+                    if (syncStateSet.contains(brokerAddress)) {
+                        long id = StringUtils.equals(master, brokerAddress) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(brokerAddress);
+                        inSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id));
+                    } else {
+                        notInSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId));
+                    }
                 });
 
-                final InSyncStateData.InSyncStateSet inSyncState = new InSyncStateData.InSyncStateSet(master, syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), inSyncMembers);
-                inSyncStateData.addInSyncState(brokerName, inSyncState);
+                final BrokerReplicasInfo.ReplicasInfo inSyncState = new BrokerReplicasInfo.ReplicasInfo(master, syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), inSyncReplicas, notInSyncReplicas);
+                brokerReplicasInfo.addReplicaInfo(brokerName, inSyncState);
             }
         }
-        result.setBody(inSyncStateData.encode());
+        result.setBody(brokerReplicasInfo.encode());
         return result;
     }
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
similarity index 59%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
index 2496f260a..fece50d2e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
@@ -21,38 +21,41 @@ import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class InSyncStateData extends RemotingSerializable  {
-    private Map<String/*brokerName*/, InSyncStateSet> inSyncStateTable;
+public class BrokerReplicasInfo extends RemotingSerializable  {
+    private Map<String/*brokerName*/, ReplicasInfo> replicasInfoTable;
 
-    public InSyncStateData() {
-        this.inSyncStateTable = new HashMap<>();
+    public BrokerReplicasInfo() {
+        this.replicasInfoTable = new HashMap<>();
     }
 
-    public void addInSyncState(final String brokerName, final InSyncStateSet inSyncState) {
-        this.inSyncStateTable.put(brokerName, inSyncState);
+    public void addReplicaInfo(final String brokerName, final ReplicasInfo replicasInfo) {
+        this.replicasInfoTable.put(brokerName, replicasInfo);
     }
 
-    public Map<String, InSyncStateSet> getInSyncStateTable() {
-        return inSyncStateTable;
+    public Map<String, ReplicasInfo> getReplicasInfoTable() {
+        return replicasInfoTable;
     }
 
-    public void setInSyncStateTable(
-        Map<String, InSyncStateSet> inSyncStateTable) {
-        this.inSyncStateTable = inSyncStateTable;
+    public void setReplicasInfoTable(
+        Map<String, ReplicasInfo> replicasInfoTable) {
+        this.replicasInfoTable = replicasInfoTable;
     }
 
-    public static class InSyncStateSet extends RemotingSerializable {
+    public static class ReplicasInfo extends RemotingSerializable {
         private String masterAddress;
         private int masterEpoch;
         private int syncStateSetEpoch;
-        private List<InSyncMember> inSyncMembers;
+        private List<ReplicaIdentity> inSyncReplicas;
+        private List<ReplicaIdentity> notInSyncReplicas;
 
-        public InSyncStateSet(String masterAddress, int masterEpoch, int syncStateSetEpoch,
-            List<InSyncMember> inSyncMembers) {
+        public ReplicasInfo(String masterAddress, int masterEpoch, int syncStateSetEpoch,
+            List<ReplicaIdentity> inSyncReplicas,
+            List<ReplicaIdentity> notInSyncReplicas) {
             this.masterAddress = masterAddress;
             this.masterEpoch = masterEpoch;
             this.syncStateSetEpoch = syncStateSetEpoch;
-            this.inSyncMembers = inSyncMembers;
+            this.inSyncReplicas = inSyncReplicas;
+            this.notInSyncReplicas = notInSyncReplicas;
         }
 
         public String getMasterAddress() {
@@ -79,21 +82,30 @@ public class InSyncStateData extends RemotingSerializable  {
             this.syncStateSetEpoch = syncStateSetEpoch;
         }
 
-        public List<InSyncMember> getInSyncMembers() {
-            return inSyncMembers;
+        public List<ReplicaIdentity> getInSyncReplicas() {
+            return inSyncReplicas;
         }
 
-        public void setInSyncMembers(
-            List<InSyncMember> inSyncMembers) {
-            this.inSyncMembers = inSyncMembers;
+        public void setInSyncReplicas(
+            List<ReplicaIdentity> inSyncReplicas) {
+            this.inSyncReplicas = inSyncReplicas;
+        }
+
+        public List<ReplicaIdentity> getNotInSyncReplicas() {
+            return notInSyncReplicas;
+        }
+
+        public void setNotInSyncReplicas(
+            List<ReplicaIdentity> notInSyncReplicas) {
+            this.notInSyncReplicas = notInSyncReplicas;
         }
     }
 
-    public static class InSyncMember extends RemotingSerializable {
+    public static class ReplicaIdentity extends RemotingSerializable {
         private String address;
         private Long brokerId;
 
-        public InSyncMember(String address, Long brokerId) {
+        public ReplicaIdentity(String address, Long brokerId) {
             this.address = address;
             this.brokerId = brokerId;
         }
@@ -116,7 +128,7 @@ public class InSyncStateData extends RemotingSerializable  {
 
         @Override
         public String toString() {
-            return "InSyncMember{" +
+            return "{" +
                 "address='" + address + '\'' +
                 ", brokerId=" + brokerId +
                 '}';
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 7bc308036..f70580dc6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -51,7 +51,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -776,7 +776,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public InSyncStateData getInSyncStateData(String controllerAddress,
+    public BrokerReplicasInfo getInSyncStateData(String controllerAddress,
         List<String> brokers) throws RemotingException, InterruptedException, MQBrokerException {
         return this.defaultMQAdminExtImpl.getInSyncStateData(controllerAddress, brokers);
     }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 0460ed95b..fc3e079fe 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -86,7 +86,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -1819,7 +1819,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public InSyncStateData getInSyncStateData(String controllerAddress,
+    public BrokerReplicasInfo getInSyncStateData(String controllerAddress,
         List<String> brokers) throws RemotingException, InterruptedException, MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getInSyncStateData(controllerAddress, brokers);
     }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index ebf878f32..2d19af5f2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -48,7 +48,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -411,7 +411,7 @@ public interface MQAdminExt extends MQAdmin {
     HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, InterruptedException, MQBrokerException;
 
-    InSyncStateData getInSyncStateData(String controllerAddress,
+    BrokerReplicasInfo getInSyncStateData(String controllerAddress,
         List<String> brokers) throws RemotingException, InterruptedException, MQBrokerException;
 
     EpochEntryCache getBrokerEpochCache(
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
index e9699e713..252dd99fb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
@@ -24,7 +24,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
@@ -114,15 +114,20 @@ public class GetSyncStateSetSubCommand implements SubCommand {
     private void printData(String controllerAddress, List<String> brokerNames,
         DefaultMQAdminExt defaultMQAdminExt) throws Exception {
         if (brokerNames.size() > 0) {
-            final InSyncStateData syncStateData = defaultMQAdminExt.getInSyncStateData(controllerAddress, brokerNames);
-            final Map<String, InSyncStateData.InSyncStateSet> syncTable = syncStateData.getInSyncStateTable();
-            for (Map.Entry<String, InSyncStateData.InSyncStateSet> next : syncTable.entrySet()) {
-                final List<InSyncStateData.InSyncMember> syncMembers = next.getValue().getInSyncMembers();
-                System.out.printf("\n#brokerName\t%s\n#MasterAddr\t%s\n#MasterEpoch\t%d\n#SyncStateSetEpoch\t%d\n#SyncStateSetMemberNums\t%d\n",
+            final BrokerReplicasInfo brokerReplicasInfo = defaultMQAdminExt.getInSyncStateData(controllerAddress, brokerNames);
+            final Map<String, BrokerReplicasInfo.ReplicasInfo> replicasInfoTable = brokerReplicasInfo.getReplicasInfoTable();
+            for (Map.Entry<String, BrokerReplicasInfo.ReplicasInfo> next : replicasInfoTable.entrySet()) {
+                final List<BrokerReplicasInfo.ReplicaIdentity> inSyncReplicas = next.getValue().getInSyncReplicas();
+                final List<BrokerReplicasInfo.ReplicaIdentity> notInSyncReplicas = next.getValue().getNotInSyncReplicas();
+                System.out.printf("\n#brokerName\t%s\n#MasterAddr\t%s\n#MasterEpoch\t%d\n#SyncStateSetEpoch\t%d\n#SyncStateSetNums\t%d\n",
                     next.getKey(), next.getValue().getMasterAddress(), next.getValue().getMasterEpoch(), next.getValue().getSyncStateSetEpoch(),
-                    syncMembers.size());
-                for (InSyncStateData.InSyncMember member : syncMembers) {
-                    System.out.printf("\n member:\t%s\n", member.toString());
+                    inSyncReplicas.size());
+                for (BrokerReplicasInfo.ReplicaIdentity member : inSyncReplicas) {
+                    System.out.printf("\n InSyncReplica:\t%s\n", member.toString());
+                }
+
+                for (BrokerReplicasInfo.ReplicaIdentity member : notInSyncReplicas) {
+                    System.out.printf("\n NotInSyncReplica:\t%s\n", member.toString());
                 }
             }
         }