You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/04 08:36:52 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Add admin tool getControllerMetadata (#4405)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 05841188e [Summer of code] Add admin tool getControllerMetadata (#4405)
05841188e is described below

commit 05841188e2c22c04c99ede878ea541be9b28c6f2
Author: hzh0425 <64...@qq.com>
AuthorDate: Sat Jun 4 16:36:40 2022 +0800

    [Summer of code] Add admin tool getControllerMetadata (#4405)
    
    * add tool get controller metadata
    
    * fix bug
---
 .../controller/GetMetaDataResponseHeader.java      | 26 ++++++++-
 .../controller/impl/DLedgerController.java         | 10 +++-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  6 ++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  6 ++
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  2 +
 .../rocketmq/tools/command/MQAdminStartup.java     |  2 +
 .../controller/GetControllerMetaDataCommand.java   | 68 ++++++++++++++++++++++
 7 files changed, 117 insertions(+), 3 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
index a6d738a8f..339a9ea85 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
@@ -20,17 +20,29 @@ import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class GetMetaDataResponseHeader implements CommandCustomHeader {
+    private String group;
     private String controllerLeaderId;
     private String controllerLeaderAddress;
     private boolean isLeader;
+    private String peers;
 
     public GetMetaDataResponseHeader() {
     }
 
-    public GetMetaDataResponseHeader(String controllerLeaderId, String controllerLeaderAddress, boolean isLeader) {
+    public GetMetaDataResponseHeader(String group, String controllerLeaderId, String controllerLeaderAddress, boolean isLeader, String peers) {
+        this.group = group;
         this.controllerLeaderId = controllerLeaderId;
         this.controllerLeaderAddress = controllerLeaderAddress;
         this.isLeader = isLeader;
+        this.peers = peers;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
     }
 
     public String getControllerLeaderId() {
@@ -57,11 +69,21 @@ public class GetMetaDataResponseHeader implements CommandCustomHeader {
         isLeader = leader;
     }
 
+    public String getPeers() {
+        return peers;
+    }
+
+    public void setPeers(String peers) {
+        this.peers = peers;
+    }
+
     @Override public String toString() {
         return "GetMetaDataResponseHeader{" +
-            "controllerLeaderId='" + controllerLeaderId + '\'' +
+            "group='" + group + '\'' +
+            ", controllerLeaderId='" + controllerLeaderId + '\'' +
             ", controllerLeaderAddress='" + controllerLeaderAddress + '\'' +
             ", isLeader=" + isLeader +
+            ", peers='" + peers + '\'' +
             '}';
     }
 
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index ce20ffcfe..42c3d34f6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -26,6 +26,7 @@ import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
 import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -164,7 +165,14 @@ public class DLedgerController implements Controller {
     @Override
     public RemotingCommand getControllerMetadata() {
         final MemberState state = getMemberState();
-        return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new GetMetaDataResponseHeader(state.getLeaderId(), state.getLeaderAddr(), state.isLeader()));
+        final Map<String, String> peers = state.getPeerMap();
+        final StringBuilder sb = new StringBuilder();
+        for (Map.Entry<String, String> entry : peers.entrySet()) {
+            final String peer = entry.getKey() + ":" + entry.getValue();
+            sb.append(peer).append(";");
+        }
+        return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new GetMetaDataResponseHeader(
+            state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), state.isLeader(), sb.toString()));
     }
 
     /**
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index d7de1124d..f4f295bfa 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
@@ -743,6 +744,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         return this.defaultMQAdminExtImpl.getBrokerEpochCache(brokerAddr);
     }
 
+    public GetMetaDataResponseHeader getControllerMetaData(
+        String controllerAddr) throws RemotingException, InterruptedException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.getControllerMetaData(controllerAddr);
+    }
+
     @Override
     public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset)
         throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 302167309..81310603b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -86,6 +86,7 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateGroupForbiddenRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -1617,6 +1618,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return this.mqClientInstance.getMQClientAPIImpl().getBrokerEpochCache(brokerAddr);
     }
 
+    @Override public GetMetaDataResponseHeader getControllerMetaData(
+        String controllerAddr) throws RemotingException, InterruptedException, MQBrokerException {
+        return this.mqClientInstance.getMQClientAPIImpl().getControllerMetaData(controllerAddr);
+    }
+
     @Override public void resetMasterFlushOffset(String brokerAddr,
         long masterFlushOffset) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         this.mqClientInstance.getMQClientAPIImpl().resetMasterFlushOffset(brokerAddr, masterFlushOffset);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 1b3492277..201b8c4e5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -51,6 +51,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
@@ -388,6 +389,7 @@ public interface MQAdminExt extends MQAdmin {
 
     EpochEntryCache getBrokerEpochCache(String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException;
 
+    GetMetaDataResponseHeader getControllerMetaData(String controllerAddr) throws RemotingException, InterruptedException, MQBrokerException;
     /**
      * Reset master flush offset in slave
      *
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 96a4d0946..1f537dce4 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -57,6 +57,7 @@ import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
 import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
 import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand;
 import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
+import org.apache.rocketmq.tools.command.controller.GetControllerMetaDataCommand;
 import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
 import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
 import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
@@ -250,6 +251,7 @@ public class MQAdminStartup {
 
         initCommand(new SyncStateSetCommand());
         initCommand(new GetBrokerEpochCommand());
+        initCommand(new GetControllerMetaDataCommand());
     }
 
     private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
new file mode 100644
index 000000000..271272df9
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.controller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class GetControllerMetaDataCommand implements SubCommand {
+    @Override public String commandName() {
+        return "getControllerMetaData";
+    }
+
+    @Override public String commandDesc() {
+        return "get controller cluster's metadata";
+    }
+
+    @Override public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("a", "controllerAddress", true, "the address of controller");
+        opt.setRequired(true);
+        options.addOption(opt);
+        return options;
+    }
+
+    @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        String controllerAddress = commandLine.getOptionValue('a').trim();
+        try {
+            defaultMQAdminExt.start();
+            final GetMetaDataResponseHeader metaData = defaultMQAdminExt.getControllerMetaData(controllerAddress);
+            System.out.printf("\n#ControllerGroup\t%s", metaData.getGroup());
+            System.out.printf("\n#ControllerLeaderId\t%s", metaData.getControllerLeaderId());
+            System.out.printf("\n#ControllerLeaderAddress\t%s", metaData.getControllerLeaderAddress());
+            final String peers = metaData.getPeers();
+            if (StringUtils.isNotEmpty(peers)) {
+                final String[] peerList = peers.split(";");
+                for (String peer : peerList) {
+                    System.out.printf("\n#Peer:\t%s", peer);
+                }
+            }
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}