You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/07 01:14:39 UTC
[rocketmq] 13/14: fix(controller): fix some bug to pass AutoSwitchRoleIntegrationTest
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 3951411ef7a33e0018568242ea3bdcb73801b035
Author: TheR1sing3un <th...@163.com>
AuthorDate: Tue Feb 7 00:57:31 2023 +0800
fix(controller): fix some bug to pass AutoSwitchRoleIntegrationTest
1. fix some bug to pass AutoSwitchRoleIntegrationTest
---
.../java/org/apache/rocketmq/broker/BrokerController.java | 2 +-
.../apache/rocketmq/broker/controller/ReplicasManager.java | 4 ++++
.../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 10 ++++------
.../org/apache/rocketmq/controller/ControllerManager.java | 9 ++++-----
.../rocketmq/controller/impl/manager/ReplicasInfoManager.java | 3 ++-
.../controller/processor/ControllerRequestProcessor.java | 5 ++---
.../remoting/protocol/body/RoleChangeNotifyEntry.java | 11 +++++++++--
.../protocol/header/controller/ElectMasterResponseHeader.java | 10 ----------
.../test/autoswitchrole/AutoSwitchRoleIntegrationTest.java | 2 --
.../org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java | 6 ++++--
.../apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java | 6 ++++--
.../main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java | 6 ++++--
.../tools/command/controller/ReElectMasterSubCommand.java | 7 +++++--
13 files changed, 43 insertions(+), 38 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a96aa0405..bf9565770 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1747,7 +1747,7 @@ public class BrokerController {
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
+ this.replicasManager.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer(), this.replicasManager.getLastEpoch(),
this.messageStore.getMaxPhyOffset(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 4d2ed8e80..78f40495d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -772,4 +772,8 @@ public class ReplicasManager {
public List<String> getAvailableControllerAddresses() {
return new ArrayList<>(availableControllerAddresses.keySet());
}
+
+ public Long getBrokerId() {
+ return brokerId;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index ab8880625..2c9790b29 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
@@ -3075,8 +3076,8 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
}
}
- public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName,
- Long brokerId) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException {
+ public Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName, String brokerName,
+ Long brokerId) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException {
//get controller leader address
final GetMetaDataResponseHeader controllerMetaData = this.getControllerMetaData(controllerAddr);
@@ -3092,10 +3093,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
case ResponseCode.SUCCESS: {
BrokerMemberGroup brokerMemberGroup = RemotingSerializable.decode(response.getBody(), BrokerMemberGroup.class);
ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
- if (null != responseHeader) {
- responseHeader.setBrokerMemberGroup(brokerMemberGroup);
- }
- return responseHeader;
+ return new Pair<>(responseHeader, brokerMemberGroup);
}
default:
break;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 0f565ec81..96dcc6bc8 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -45,11 +45,11 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
@@ -131,11 +131,10 @@ public class ControllerManager {
final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(brokerName));
final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS);
- final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader();
- if (responseHeader != null) {
- log.info("The broker with brokerId: {} in broker-set: {} shutdown, elect a new master done, result: {}", brokerId, brokerName, responseHeader);
+ if (electMasterResponse.getCode() == ResponseCode.SUCCESS) {
+ log.info("The broker with brokerId: {} in broker-set: {} shutdown, elect a new master done, result: {}", brokerId, brokerName, electMasterResponse);
if (controllerConfig.isNotifyBrokerRoleChanged()) {
- notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader));
+ notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(electMasterResponse));
}
}
} catch (Exception e) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index d2061bb24..5cf703c65 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -215,7 +215,6 @@ public class ReplicasInfoManager {
response.setSyncStateSetEpoch(syncStateSetEpoch + 1);
BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName);
if (null != brokerMemberGroup) {
- response.setBrokerMemberGroup(brokerMemberGroup);
result.setBody(brokerMemberGroup.encode());
}
final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster);
@@ -313,6 +312,8 @@ public class ReplicasInfoManager {
// if master still exist
response.setMasterBrokerId(syncStateInfo.getMasterBrokerId());
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(response.getMasterBrokerId()));
+ response.setMasterEpoch(syncStateInfo.getMasterEpoch());
+ response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
}
// if this broker's address has been changed, we need to update it
if (!brokerAddress.equals(brokerReplicaInfo.getBrokerAddress(brokerId))) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 9e300b738..e5cdd926f 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -130,11 +130,10 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
if (future != null) {
final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
- final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
- if (response.getCode() == ResponseCode.SUCCESS && responseHeader != null) {
+ if (response.getCode() == ResponseCode.SUCCESS) {
if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
- this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader));
+ this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response));
}
}
return response;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
index 91f9e1e8d..4f3f31218 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.remoting.protocol.body;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
public class RoleChangeNotifyEntry {
@@ -40,8 +42,13 @@ public class RoleChangeNotifyEntry {
this.masterBrokerId = masterBrokerId;
}
- public static RoleChangeNotifyEntry convert(ElectMasterResponseHeader header) {
- return new RoleChangeNotifyEntry(header.getBrokerMemberGroup(), header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch());
+ public static RoleChangeNotifyEntry convert(RemotingCommand electMasterResponse) {
+ final ElectMasterResponseHeader header = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader();
+ BrokerMemberGroup brokerMemberGroup = null;
+ if (electMasterResponse.getBody() != null && electMasterResponse.getBody().length > 0) {
+ brokerMemberGroup = RemotingSerializable.decode(electMasterResponse.getBody(), BrokerMemberGroup.class);
+ }
+ return new RoleChangeNotifyEntry(brokerMemberGroup, header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch());
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
index 658e2b592..b8a4c58cf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
@@ -26,7 +26,6 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
private String masterAddress;
private Integer masterEpoch;
private Integer syncStateSetEpoch;
- private BrokerMemberGroup brokerMemberGroup;
public ElectMasterResponseHeader() {
}
@@ -55,14 +54,6 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
this.syncStateSetEpoch = syncStateSetEpoch;
}
- public BrokerMemberGroup getBrokerMemberGroup() {
- return brokerMemberGroup;
- }
-
- public void setBrokerMemberGroup(BrokerMemberGroup brokerMemberGroup) {
- this.brokerMemberGroup = brokerMemberGroup;
- }
-
public void setMasterBrokerId(Long masterBrokerId) {
this.masterBrokerId = masterBrokerId;
}
@@ -78,7 +69,6 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
", masterAddress='" + masterAddress + '\'' +
", masterEpoch=" + masterEpoch +
", syncStateSetEpoch=" + syncStateSetEpoch +
- ", brokerMemberGroup=" + brokerMemberGroup +
'}';
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index d145fc516..71855a837 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -179,9 +179,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
BrokerController broker3 = startBroker(nameserverAddress, controllerAddress, brokerName, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, DEFAULT_FILE_SIZE);
waitSlaveReady(broker3.getMessageStore());
-
checkMessage(broker3.getMessageStore(), topic, 10, 0);
-
putMessage(this.brokerController1.getMessageStore(), topic);
checkMessage(broker3.getMessageStore(), topic, 20, 0);
shutdownAndClearBroker();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index ce0c7a8a5..38fe063b2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
@@ -41,6 +42,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
@@ -832,8 +834,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName,
- String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException {
+ public Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName,
+ String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.electMaster(controllerAddr, clusterName, brokerName, brokerId);
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index ac4b51c59..3c55e3f24 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -76,6 +77,7 @@ import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
@@ -1843,8 +1845,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName,
- String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException {
+ public Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName,
+ String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().electMaster(controllerAddr, clusterName, brokerName, brokerId);
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index e8cb3e1f8..889974de8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
@@ -38,6 +39,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
@@ -461,8 +463,8 @@ public interface MQAdminExt extends MQAdmin {
* @throws InterruptedException
* @throws MQBrokerException
*/
- ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName,
- Long brokerId) throws RemotingException, InterruptedException, MQBrokerException;
+ Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName, String brokerName,
+ Long brokerId) throws RemotingException, InterruptedException, MQBrokerException;
/**
* clean controller broker meta data
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
index 1861754c5..e45f307fa 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
@@ -20,8 +20,10 @@ package org.apache.rocketmq.tools.command.controller;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
@@ -72,13 +74,14 @@ public class ReElectMasterSubCommand implements SubCommand {
try {
defaultMQAdminExt.start();
- final ElectMasterResponseHeader metaData = defaultMQAdminExt.electMaster(controllerAddress, clusterName, brokerName, brokerId);
+ final Pair<ElectMasterResponseHeader, BrokerMemberGroup> pair = defaultMQAdminExt.electMaster(controllerAddress, clusterName, brokerName, brokerId);
+ final ElectMasterResponseHeader metaData = pair.getObject1();
+ final BrokerMemberGroup brokerMemberGroup = pair.getObject2();
System.out.printf("\n#ClusterName\t%s", clusterName);
System.out.printf("\n#BrokerName\t%s", brokerName);
System.out.printf("\n#BrokerMasterAddr\t%s", metaData.getMasterAddress());
System.out.printf("\n#MasterEpoch\t%s", metaData.getMasterEpoch());
System.out.printf("\n#SyncStateSetEpoch\t%s\n", metaData.getSyncStateSetEpoch());
- BrokerMemberGroup brokerMemberGroup = metaData.getBrokerMemberGroup();
if (null != brokerMemberGroup && null != brokerMemberGroup.getBrokerAddrs()) {
brokerMemberGroup.getBrokerAddrs().forEach((key, value) -> System.out.printf("\t#Broker\t%d\t%s\n", key, value));
}