You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/04 08:36:52 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Add admin tool getControllerMetadata (#4405)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 05841188e [Summer of code] Add admin tool getControllerMetadata (#4405)
05841188e is described below
commit 05841188e2c22c04c99ede878ea541be9b28c6f2
Author: hzh0425 <64...@qq.com>
AuthorDate: Sat Jun 4 16:36:40 2022 +0800
[Summer of code] Add admin tool getControllerMetadata (#4405)
* add tool get controller metadata
* fix bug
---
.../controller/GetMetaDataResponseHeader.java | 26 ++++++++-
.../controller/impl/DLedgerController.java | 10 +++-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 6 ++
.../tools/admin/DefaultMQAdminExtImpl.java | 6 ++
.../apache/rocketmq/tools/admin/MQAdminExt.java | 2 +
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../controller/GetControllerMetaDataCommand.java | 68 ++++++++++++++++++++++
7 files changed, 117 insertions(+), 3 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
index a6d738a8f..339a9ea85 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
@@ -20,17 +20,29 @@ import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetMetaDataResponseHeader implements CommandCustomHeader {
+ private String group;
private String controllerLeaderId;
private String controllerLeaderAddress;
private boolean isLeader;
+ private String peers;
public GetMetaDataResponseHeader() {
}
- public GetMetaDataResponseHeader(String controllerLeaderId, String controllerLeaderAddress, boolean isLeader) {
+ public GetMetaDataResponseHeader(String group, String controllerLeaderId, String controllerLeaderAddress, boolean isLeader, String peers) {
+ this.group = group;
this.controllerLeaderId = controllerLeaderId;
this.controllerLeaderAddress = controllerLeaderAddress;
this.isLeader = isLeader;
+ this.peers = peers;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
}
public String getControllerLeaderId() {
@@ -57,11 +69,21 @@ public class GetMetaDataResponseHeader implements CommandCustomHeader {
isLeader = leader;
}
+ public String getPeers() {
+ return peers;
+ }
+
+ public void setPeers(String peers) {
+ this.peers = peers;
+ }
+
@Override public String toString() {
return "GetMetaDataResponseHeader{" +
- "controllerLeaderId='" + controllerLeaderId + '\'' +
+ "group='" + group + '\'' +
+ ", controllerLeaderId='" + controllerLeaderId + '\'' +
", controllerLeaderAddress='" + controllerLeaderAddress + '\'' +
", isLeader=" + isLeader +
+ ", peers='" + peers + '\'' +
'}';
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index ce20ffcfe..42c3d34f6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -26,6 +26,7 @@ import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -164,7 +165,14 @@ public class DLedgerController implements Controller {
@Override
public RemotingCommand getControllerMetadata() {
final MemberState state = getMemberState();
- return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new GetMetaDataResponseHeader(state.getLeaderId(), state.getLeaderAddr(), state.isLeader()));
+ final Map<String, String> peers = state.getPeerMap();
+ final StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : peers.entrySet()) {
+ final String peer = entry.getKey() + ":" + entry.getValue();
+ sb.append(peer).append(";");
+ }
+ return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new GetMetaDataResponseHeader(
+ state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), state.isLeader(), sb.toString()));
}
/**
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index d7de1124d..f4f295bfa 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
@@ -743,6 +744,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return this.defaultMQAdminExtImpl.getBrokerEpochCache(brokerAddr);
}
+ public GetMetaDataResponseHeader getControllerMetaData(
+ String controllerAddr) throws RemotingException, InterruptedException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.getControllerMetaData(controllerAddr);
+ }
+
@Override
public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset)
throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 302167309..81310603b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -86,6 +86,7 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGroupForbiddenRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -1617,6 +1618,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerEpochCache(brokerAddr);
}
+ @Override public GetMetaDataResponseHeader getControllerMetaData(
+ String controllerAddr) throws RemotingException, InterruptedException, MQBrokerException {
+ return this.mqClientInstance.getMQClientAPIImpl().getControllerMetaData(controllerAddr);
+ }
+
@Override public void resetMasterFlushOffset(String brokerAddr,
long masterFlushOffset) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
this.mqClientInstance.getMQClientAPIImpl().resetMasterFlushOffset(brokerAddr, masterFlushOffset);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 1b3492277..201b8c4e5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -51,6 +51,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
@@ -388,6 +389,7 @@ public interface MQAdminExt extends MQAdmin {
EpochEntryCache getBrokerEpochCache(String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException;
+ GetMetaDataResponseHeader getControllerMetaData(String controllerAddr) throws RemotingException, InterruptedException, MQBrokerException;
/**
* Reset master flush offset in slave
*
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 96a4d0946..1f537dce4 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -57,6 +57,7 @@ import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand;
import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
+import org.apache.rocketmq.tools.command.controller.GetControllerMetaDataCommand;
import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
@@ -250,6 +251,7 @@ public class MQAdminStartup {
initCommand(new SyncStateSetCommand());
initCommand(new GetBrokerEpochCommand());
+ initCommand(new GetControllerMetaDataCommand());
}
private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
new file mode 100644
index 000000000..271272df9
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.controller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class GetControllerMetaDataCommand implements SubCommand {
+ @Override public String commandName() {
+ return "getControllerMetaData";
+ }
+
+ @Override public String commandDesc() {
+ return "get controller cluster's metadata";
+ }
+
+ @Override public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("a", "controllerAddress", true, "the address of controller");
+ opt.setRequired(true);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ String controllerAddress = commandLine.getOptionValue('a').trim();
+ try {
+ defaultMQAdminExt.start();
+ final GetMetaDataResponseHeader metaData = defaultMQAdminExt.getControllerMetaData(controllerAddress);
+ System.out.printf("\n#ControllerGroup\t%s", metaData.getGroup());
+ System.out.printf("\n#ControllerLeaderId\t%s", metaData.getControllerLeaderId());
+ System.out.printf("\n#ControllerLeaderAddress\t%s", metaData.getControllerLeaderAddress());
+ final String peers = metaData.getPeers();
+ if (StringUtils.isNotEmpty(peers)) {
+ final String[] peerList = peers.split(";");
+ for (String peer : peerList) {
+ System.out.printf("\n#Peer:\t%s", peer);
+ }
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}