You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/28 02:08:57 UTC
[rocketmq] branch develop updated: GetSyncStateSetSubCommand can also print that the broker is not in syncStateSet (#5935)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ea930182 GetSyncStateSetSubCommand can also print that the broker is not in syncStateSet (#5935)
0ea930182 is described below
commit 0ea930182c60f2dbb2bf036223d9bd941fcd2f3b
Author: rongtong <ji...@163.com>
AuthorDate: Sat Jan 28 10:08:51 2023 +0800
GetSyncStateSetSubCommand can also print that the broker is not in syncStateSet (#5935)
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 6 +--
.../impl/manager/ReplicasInfoManager.java | 26 ++++++----
...nSyncStateData.java => BrokerReplicasInfo.java} | 60 +++++++++++++---------
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 4 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 4 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 4 +-
.../command/ha/GetSyncStateSetSubCommand.java | 23 +++++----
7 files changed, 75 insertions(+), 52 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f251e2b00..8347f3653 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -109,7 +109,7 @@ import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
@@ -2982,7 +2982,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public InSyncStateData getInSyncStateData(final String controllerAddress,
+ public BrokerReplicasInfo getInSyncStateData(final String controllerAddress,
final List<String> brokers) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingCommandException {
// Get controller leader address.
final GetMetaDataResponseHeader controllerMetaData = getControllerMetaData(controllerAddress);
@@ -2997,7 +2997,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
- return RemotingSerializable.decode(response.getBody(), InSyncStateData.class);
+ return RemotingSerializable.decode(response.getBody(), BrokerReplicasInfo.class);
}
default:
break;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index a820b069e..1c5a805b6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -41,7 +41,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
@@ -264,7 +264,7 @@ public class ReplicasInfoManager {
canBeElectedAsMaster = syncStateInfo.getSyncStateSet().contains(brokerAddress) || this.controllerConfig.isEnableElectUncleanMaster();
}
if (!canBeElectedAsMaster) {
- // still need to apply an ElectMasterEvent to tell the statemachine
+ // still need to apply an ElectMasterEvent to tell the statemachine
// that the master was shutdown and no new master was elected. set SyncStateInfo.masterAddress empty
final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
result.addEvent(event);
@@ -322,7 +322,7 @@ public class ReplicasInfoManager {
public ControllerResult<Void> getSyncStateData(final List<String> brokerNames) {
final ControllerResult<Void> result = new ControllerResult<>();
- final InSyncStateData inSyncStateData = new InSyncStateData();
+ final BrokerReplicasInfo brokerReplicasInfo = new BrokerReplicasInfo();
for (String brokerName : brokerNames) {
if (isContainsBroker(brokerName)) {
// If exist broker metadata, just return metadata
@@ -330,17 +330,23 @@ public class ReplicasInfoManager {
final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
final String master = syncStateInfo.getMasterAddress();
- final ArrayList<InSyncStateData.InSyncMember> inSyncMembers = new ArrayList<>();
- syncStateSet.forEach(replicas -> {
- long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(replicas);
- inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerId));
+ final ArrayList<BrokerReplicasInfo.ReplicaIdentity> inSyncReplicas = new ArrayList<>();
+ final ArrayList<BrokerReplicasInfo.ReplicaIdentity> notInSyncReplicas = new ArrayList<>();
+
+ brokerInfo.getBrokerIdTable().forEach((brokerAddress, brokerId) -> {
+ if (syncStateSet.contains(brokerAddress)) {
+ long id = StringUtils.equals(master, brokerAddress) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(brokerAddress);
+ inSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id));
+ } else {
+ notInSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId));
+ }
});
- final InSyncStateData.InSyncStateSet inSyncState = new InSyncStateData.InSyncStateSet(master, syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), inSyncMembers);
- inSyncStateData.addInSyncState(brokerName, inSyncState);
+ final BrokerReplicasInfo.ReplicasInfo inSyncState = new BrokerReplicasInfo.ReplicasInfo(master, syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), inSyncReplicas, notInSyncReplicas);
+ brokerReplicasInfo.addReplicaInfo(brokerName, inSyncState);
}
}
- result.setBody(inSyncStateData.encode());
+ result.setBody(brokerReplicasInfo.encode());
return result;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
similarity index 59%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
index 2496f260a..fece50d2e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
@@ -21,38 +21,41 @@ import java.util.List;
import java.util.Map;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-public class InSyncStateData extends RemotingSerializable {
- private Map<String/*brokerName*/, InSyncStateSet> inSyncStateTable;
+public class BrokerReplicasInfo extends RemotingSerializable {
+ private Map<String/*brokerName*/, ReplicasInfo> replicasInfoTable;
- public InSyncStateData() {
- this.inSyncStateTable = new HashMap<>();
+ public BrokerReplicasInfo() {
+ this.replicasInfoTable = new HashMap<>();
}
- public void addInSyncState(final String brokerName, final InSyncStateSet inSyncState) {
- this.inSyncStateTable.put(brokerName, inSyncState);
+ public void addReplicaInfo(final String brokerName, final ReplicasInfo replicasInfo) {
+ this.replicasInfoTable.put(brokerName, replicasInfo);
}
- public Map<String, InSyncStateSet> getInSyncStateTable() {
- return inSyncStateTable;
+ public Map<String, ReplicasInfo> getReplicasInfoTable() {
+ return replicasInfoTable;
}
- public void setInSyncStateTable(
- Map<String, InSyncStateSet> inSyncStateTable) {
- this.inSyncStateTable = inSyncStateTable;
+ public void setReplicasInfoTable(
+ Map<String, ReplicasInfo> replicasInfoTable) {
+ this.replicasInfoTable = replicasInfoTable;
}
- public static class InSyncStateSet extends RemotingSerializable {
+ public static class ReplicasInfo extends RemotingSerializable {
private String masterAddress;
private int masterEpoch;
private int syncStateSetEpoch;
- private List<InSyncMember> inSyncMembers;
+ private List<ReplicaIdentity> inSyncReplicas;
+ private List<ReplicaIdentity> notInSyncReplicas;
- public InSyncStateSet(String masterAddress, int masterEpoch, int syncStateSetEpoch,
- List<InSyncMember> inSyncMembers) {
+ public ReplicasInfo(String masterAddress, int masterEpoch, int syncStateSetEpoch,
+ List<ReplicaIdentity> inSyncReplicas,
+ List<ReplicaIdentity> notInSyncReplicas) {
this.masterAddress = masterAddress;
this.masterEpoch = masterEpoch;
this.syncStateSetEpoch = syncStateSetEpoch;
- this.inSyncMembers = inSyncMembers;
+ this.inSyncReplicas = inSyncReplicas;
+ this.notInSyncReplicas = notInSyncReplicas;
}
public String getMasterAddress() {
@@ -79,21 +82,30 @@ public class InSyncStateData extends RemotingSerializable {
this.syncStateSetEpoch = syncStateSetEpoch;
}
- public List<InSyncMember> getInSyncMembers() {
- return inSyncMembers;
+ public List<ReplicaIdentity> getInSyncReplicas() {
+ return inSyncReplicas;
}
- public void setInSyncMembers(
- List<InSyncMember> inSyncMembers) {
- this.inSyncMembers = inSyncMembers;
+ public void setInSyncReplicas(
+ List<ReplicaIdentity> inSyncReplicas) {
+ this.inSyncReplicas = inSyncReplicas;
+ }
+
+ public List<ReplicaIdentity> getNotInSyncReplicas() {
+ return notInSyncReplicas;
+ }
+
+ public void setNotInSyncReplicas(
+ List<ReplicaIdentity> notInSyncReplicas) {
+ this.notInSyncReplicas = notInSyncReplicas;
}
}
- public static class InSyncMember extends RemotingSerializable {
+ public static class ReplicaIdentity extends RemotingSerializable {
private String address;
private Long brokerId;
- public InSyncMember(String address, Long brokerId) {
+ public ReplicaIdentity(String address, Long brokerId) {
this.address = address;
this.brokerId = brokerId;
}
@@ -116,7 +128,7 @@ public class InSyncStateData extends RemotingSerializable {
@Override
public String toString() {
- return "InSyncMember{" +
+ return "{" +
"address='" + address + '\'' +
", brokerId=" + brokerId +
'}';
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 7bc308036..f70580dc6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -51,7 +51,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -776,7 +776,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public InSyncStateData getInSyncStateData(String controllerAddress,
+ public BrokerReplicasInfo getInSyncStateData(String controllerAddress,
List<String> brokers) throws RemotingException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.getInSyncStateData(controllerAddress, brokers);
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 0460ed95b..fc3e079fe 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -86,7 +86,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -1819,7 +1819,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public InSyncStateData getInSyncStateData(String controllerAddress,
+ public BrokerReplicasInfo getInSyncStateData(String controllerAddress,
List<String> brokers) throws RemotingException, InterruptedException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getInSyncStateData(controllerAddress, brokers);
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index ebf878f32..2d19af5f2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -48,7 +48,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -411,7 +411,7 @@ public interface MQAdminExt extends MQAdmin {
HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException;
- InSyncStateData getInSyncStateData(String controllerAddress,
+ BrokerReplicasInfo getInSyncStateData(String controllerAddress,
List<String> brokers) throws RemotingException, InterruptedException, MQBrokerException;
EpochEntryCache getBrokerEpochCache(
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
index e9699e713..252dd99fb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
@@ -24,7 +24,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
@@ -114,15 +114,20 @@ public class GetSyncStateSetSubCommand implements SubCommand {
private void printData(String controllerAddress, List<String> brokerNames,
DefaultMQAdminExt defaultMQAdminExt) throws Exception {
if (brokerNames.size() > 0) {
- final InSyncStateData syncStateData = defaultMQAdminExt.getInSyncStateData(controllerAddress, brokerNames);
- final Map<String, InSyncStateData.InSyncStateSet> syncTable = syncStateData.getInSyncStateTable();
- for (Map.Entry<String, InSyncStateData.InSyncStateSet> next : syncTable.entrySet()) {
- final List<InSyncStateData.InSyncMember> syncMembers = next.getValue().getInSyncMembers();
- System.out.printf("\n#brokerName\t%s\n#MasterAddr\t%s\n#MasterEpoch\t%d\n#SyncStateSetEpoch\t%d\n#SyncStateSetMemberNums\t%d\n",
+ final BrokerReplicasInfo brokerReplicasInfo = defaultMQAdminExt.getInSyncStateData(controllerAddress, brokerNames);
+ final Map<String, BrokerReplicasInfo.ReplicasInfo> replicasInfoTable = brokerReplicasInfo.getReplicasInfoTable();
+ for (Map.Entry<String, BrokerReplicasInfo.ReplicasInfo> next : replicasInfoTable.entrySet()) {
+ final List<BrokerReplicasInfo.ReplicaIdentity> inSyncReplicas = next.getValue().getInSyncReplicas();
+ final List<BrokerReplicasInfo.ReplicaIdentity> notInSyncReplicas = next.getValue().getNotInSyncReplicas();
+ System.out.printf("\n#brokerName\t%s\n#MasterAddr\t%s\n#MasterEpoch\t%d\n#SyncStateSetEpoch\t%d\n#SyncStateSetNums\t%d\n",
next.getKey(), next.getValue().getMasterAddress(), next.getValue().getMasterEpoch(), next.getValue().getSyncStateSetEpoch(),
- syncMembers.size());
- for (InSyncStateData.InSyncMember member : syncMembers) {
- System.out.printf("\n member:\t%s\n", member.toString());
+ inSyncReplicas.size());
+ for (BrokerReplicasInfo.ReplicaIdentity member : inSyncReplicas) {
+ System.out.printf("\n InSyncReplica:\t%s\n", member.toString());
+ }
+
+ for (BrokerReplicasInfo.ReplicaIdentity member : notInSyncReplicas) {
+ System.out.printf("\n NotInSyncReplica:\t%s\n", member.toString());
}
}
}