You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/20 15:37:51 UTC

[kafka] branch trunk updated: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 150fd5b0b1 KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469)
150fd5b0b1 is described below

commit 150fd5b0b18c4761d8f7d7ba9a480aa9f622024f
Author: dengziming <de...@gmail.com>
AuthorDate: Sat Aug 20 23:37:26 2022 +0800

    KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469)
    
     Add `MetadataQuorumCommand` to describe quorum status, I'm trying to use arg4j style command format, currently, we only support one sub-command which is "describe" and we can specify 2 arguments which are --status and --replication.
    
    ```
    # describe quorum status
    kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication
    
    ReplicaId       LogEndOffset    Lag     LastFetchTimeMs LastCaughtUpTimeMs      Status
    0               10                      0       -1                      -1                                       Leader
    1               10                      0       -1                      -1                                       Follower
    2               10                      0       -1                      -1                                       Follower
    
    kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
    ClusterId:                             fMCL8kv1SWm87L_Md-I2hg
    LeaderId:                             3002
    LeaderEpoch:                      2
    HighWatermark:                  10
    MaxFollowerLag:                 0
    MaxFollowerLagTimeMs:   -1
    CurrentVoters:                    [3000,3001,3002]
    CurrentObservers:              [0,1,2]
    
    # specify AdminClient properties
    kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config config.properties describe --status
    ```
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 bin/kafka-metadata-quorum.sh                       |  17 ++
 bin/windows/kafka-metatada-quorum.bat              |  17 ++
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   1 +
 .../kafka/clients/admin/KafkaAdminClient.java      |   2 +
 .../org/apache/kafka/clients/admin/QuorumInfo.java |  14 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   2 +-
 .../scala/kafka/admin/MetadataQuorumCommand.scala  | 172 ++++++++++++++++++
 .../test/junit/RaftClusterInvocationContext.java   |   4 +
 .../kafka/admin/MetadataQuorumCommandTest.scala    | 192 +++++++++++++++++++++
 .../kafka/server/DescribeQuorumRequestTest.scala   |   2 +
 .../org/apache/kafka/server/util}/ToolsUtils.java  |  51 +++++-
 .../apache/kafka/tools/ProducerPerformance.java    |   1 +
 .../apache/kafka/tools/TransactionsCommand.java    |  46 +----
 14 files changed, 474 insertions(+), 48 deletions(-)

diff --git a/bin/kafka-metadata-quorum.sh b/bin/kafka-metadata-quorum.sh
new file mode 100755
index 0000000000..24bedbded1
--- /dev/null
+++ b/bin/kafka-metadata-quorum.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@"
diff --git a/bin/windows/kafka-metatada-quorum.bat b/bin/windows/kafka-metatada-quorum.bat
new file mode 100644
index 0000000000..4ea8e3109f
--- /dev/null
+++ b/bin/windows/kafka-metatada-quorum.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+"%~dp0kafka-run-class.bat" kafka.admin.MetadataQuorumCommand %*
diff --git a/build.gradle b/build.gradle
index f17011ca4d..7c38d899a6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1705,6 +1705,7 @@ project(':tools') {
 
   dependencies {
     implementation project(':clients')
+    implementation project(':server-common')
     implementation project(':log4j-appender')
     implementation libs.argparse4j
     implementation libs.jacksonDatabind
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4b07a26cba..d24d1e7e5e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -366,6 +366,7 @@
 
   <subpackage name="tools">
     <allow pkg="org.apache.kafka.common"/>
+    <allow pkg="org.apache.kafka.server.util" />
     <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="org.apache.kafka.clients.producer" />
     <allow pkg="org.apache.kafka.clients.consumer" />
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 41eb27a1dd..e5df779b61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4359,6 +4359,8 @@ public class KafkaAdminClient extends AdminClient {
             private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
                 return new QuorumInfo(
                         partition.leaderId(),
+                        partition.leaderEpoch(),
+                        partition.highWatermark(),
                         partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()),
                         partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList()));
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
index 75476d77dc..3a0b6cf6f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
@@ -25,11 +25,15 @@ import java.util.OptionalLong;
  */
 public class QuorumInfo {
     private final Integer leaderId;
+    private final Integer leaderEpoch;
+    private final Long highWatermark;
     private final List<ReplicaState> voters;
     private final List<ReplicaState> observers;
 
-    QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+    QuorumInfo(Integer leaderId, Integer leaderEpoch, Long highWatermark, List<ReplicaState> voters, List<ReplicaState> observers) {
         this.leaderId = leaderId;
+        this.leaderEpoch = leaderEpoch;
+        this.highWatermark = highWatermark;
         this.voters = voters;
         this.observers = observers;
     }
@@ -38,6 +42,14 @@ public class QuorumInfo {
         return leaderId;
     }
 
+    public Integer leaderEpoch() {
+        return leaderEpoch;
+    }
+
+    public Long highWatermark() {
+        return highWatermark;
+    }
+
     public List<ReplicaState> voters() {
         return voters;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 5faf53f075..193457655a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -608,7 +608,7 @@ public class KafkaAdminClientTest {
     }
 
     private static QuorumInfo defaultQuorumInfo(Boolean emptyOptionals) {
-        return new QuorumInfo(1,
+        return new QuorumInfo(1, 1, 1L,
                 singletonList(new QuorumInfo.ReplicaState(1, 100,
                         emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000),
                         emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))),
diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala
new file mode 100644
index 0000000000..b6e4e1597b
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.tools.TerseFailure
+import kafka.utils.Exit
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue}
+import net.sourceforge.argparse4j.inf.Subparsers
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.admin.{Admin, QuorumInfo}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.ToolsUtils.prettyPrintTable
+
+import java.io.File
+import java.util.Properties
+import scala.jdk.CollectionConverters._
+
+/**
+ * A tool for describing quorum status
+ */
+object MetadataQuorumCommand {
+
+  def main(args: Array[String]): Unit = {
+    val res = mainNoExit(args)
+    Exit.exit(res)
+  }
+
+  def mainNoExit(args: Array[String]): Int = {
+    val parser = ArgumentParsers
+      .newArgumentParser("kafka-metadata-quorum")
+      .defaultHelp(true)
+      .description("This tool describes kraft metadata quorum status.")
+    parser
+      .addArgument("--bootstrap-server")
+      .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
+      .required(true)
+
+    parser
+      .addArgument("--command-config")
+      .`type`(fileType())
+      .help("Property file containing configs to be passed to Admin Client.")
+    val subparsers = parser.addSubparsers().dest("command")
+    addDescribeParser(subparsers)
+
+    var admin: Admin = null
+    try {
+      val namespace = parser.parseArgsOrFail(args)
+      val command = namespace.getString("command")
+
+      val commandConfig = namespace.get[File]("command_config")
+      val props = if (commandConfig != null) {
+        if (!commandConfig.exists()) {
+          throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!")
+        }
+        Utils.loadProps(commandConfig.getPath)
+      } else {
+        new Properties()
+      }
+      props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"))
+      admin = Admin.create(props)
+
+      if (command == "describe") {
+        if (namespace.getBoolean("status") && namespace.getBoolean("replication")) {
+          throw new TerseFailure(s"Only one of --status or --replication should be specified with describe sub-command")
+        } else if (namespace.getBoolean("replication")) {
+          handleDescribeReplication(admin)
+        } else if (namespace.getBoolean("status")) {
+          handleDescribeStatus(admin)
+        } else {
+          throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command")
+        }
+      } else {
+        throw new IllegalStateException(s"Unknown command: $command, only 'describe' is supported")
+      }
+      0
+    } catch {
+      case e: TerseFailure =>
+        Console.err.println(e.getMessage)
+        1
+    } finally {
+      if (admin != null) {
+        admin.close()
+      }
+    }
+  }
+
+  def addDescribeParser(subparsers: Subparsers): Unit = {
+    val describeParser = subparsers
+      .addParser("describe")
+      .help("Describe the metadata quorum info")
+
+    val statusArgs = describeParser.addArgumentGroup("Status")
+    statusArgs
+      .addArgument("--status")
+      .help(
+        "A short summary of the quorum status and the other provides detailed information about the status of replication.")
+      .action(storeTrue())
+    val replicationArgs = describeParser.addArgumentGroup("Replication")
+    replicationArgs
+      .addArgument("--replication")
+      .help("Detailed information about the status of replication")
+      .action(storeTrue())
+  }
+
+  private def handleDescribeReplication(admin: Admin): Unit = {
+    val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get
+    val leaderId = quorumInfo.leaderId
+    val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head
+
+    def convertQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Seq[Array[String]] =
+      infos.map { info =>
+        Array(info.replicaId,
+              info.logEndOffset,
+              leader.logEndOffset - info.logEndOffset,
+              info.lastFetchTimeMs.orElse(-1),
+              info.lastCaughtUpTimeMs.orElse(-1),
+              status
+        ).map(_.toString)
+      }
+    prettyPrintTable(
+      Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status"),
+      (convertQuorumInfo(Seq(leader), "Leader")
+        ++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower")
+        ++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava,
+      scala.Console.out
+    )
+  }
+
+  private def handleDescribeStatus(admin: Admin): Unit = {
+    val clusterId = admin.describeCluster.clusterId.get
+    val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get
+    val leaderId = quorumInfo.leaderId
+    val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head
+    val maxLagFollower = quorumInfo.voters.asScala
+      .minBy(_.logEndOffset)
+    val maxFollowerLag = leader.logEndOffset - maxLagFollower.logEndOffset
+    val maxFollowerLagTimeMs =
+      if (leader == maxLagFollower) {
+        0
+      } else if (leader.lastCaughtUpTimeMs.isPresent && maxLagFollower.lastCaughtUpTimeMs.isPresent) {
+        leader.lastCaughtUpTimeMs.getAsLong - maxLagFollower.lastCaughtUpTimeMs.getAsLong
+      } else {
+        -1
+      }
+    println(
+      s"""|ClusterId:              $clusterId
+          |LeaderId:               ${quorumInfo.leaderId}
+          |LeaderEpoch:            ${quorumInfo.leaderEpoch}
+          |HighWatermark:          ${quorumInfo.highWatermark}
+          |MaxFollowerLag:         $maxFollowerLag
+          |MaxFollowerLagTimeMs:   $maxFollowerLagTimeMs
+          |CurrentVoters:          ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
+          |CurrentObservers:       ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
+          |""".stripMargin
+    )
+  }
+}
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 40669f3068..f5c281ff24 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -183,6 +183,10 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
             ));
         }
 
+        public Collection<ControllerServer> controllerServers() {
+            return controllers().collect(Collectors.toList());
+        }
+
         @Override
         public ClusterType clusterType() {
             return ClusterType.RAFT;
diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala
new file mode 100644
index 0000000000..24b6616cb1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util.concurrent.ExecutionException
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+class MetadataQuorumCommandTest(cluster: ClusterInstance) {
+
+  /**
+   * 1. The same number of broker controllers
+   * 2. More brokers than controllers
+   * 3. Fewer brokers than controllers
+   */
+  @ClusterTests(
+    Array(
+      new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
+      new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
+      new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
+      new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
+      new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
+      new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
+    ))
+  def testDescribeQuorumReplicationSuccessful(): Unit = {
+    cluster.waitForReadyBrokers()
+    val describeOutput = TestUtils.grabConsoleOutput(
+      MetadataQuorumCommand.mainNoExit(
+        Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
+    )
+
+    val leaderPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Leader\s+""".r
+    val followerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Follower\s+""".r
+    val observerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Observer\s+""".r
+    val outputs = describeOutput.split("\n").tail
+    if (cluster.config().clusterType() == Type.CO_KRAFT) {
+      assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.length)
+    } else {
+      assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length)
+    }
+    // `matches` is not supported in scala 2.12, use `findFirstIn` instead.
+    assertTrue(leaderPattern.findFirstIn(outputs.head).nonEmpty)
+    assertEquals(1, outputs.count(leaderPattern.findFirstIn(_).nonEmpty))
+    assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.findFirstIn(_).nonEmpty))
+
+    if (cluster.config().clusterType() == Type.CO_KRAFT) {
+      assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.findFirstIn(_).nonEmpty))
+    } else {
+      assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.findFirstIn(_).nonEmpty))
+    }
+  }
+
+  /**
+   * 1. The same number of broker controllers
+   * 2. More brokers than controllers
+   * 3. Fewer brokers than controllers
+   */
+  @ClusterTests(
+    Array(
+      new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
+      new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
+      new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
+      new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
+      new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
+      new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
+    ))
+  def testDescribeQuorumStatusSuccessful(): Unit = {
+    cluster.waitForReadyBrokers()
+    val describeOutput = TestUtils.grabConsoleOutput(
+      MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
+    )
+    val outputs = describeOutput.split("\n")
+
+    assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty)
+    assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty)
+    assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty)
+    // HighWatermark may be -1
+    assertTrue("""HighWatermark:\s+[-]?\d+""".r.findFirstIn(outputs(3)).nonEmpty)
+    assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty)
+    assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty)
+    assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(6)).nonEmpty)
+
+    // There are no observers if we have fewer brokers than controllers
+    if (cluster.config().clusterType() == Type.CO_KRAFT
+        && cluster.config().numBrokers() <= cluster.config().numControllers()) {
+      assertTrue("""CurrentObservers:\s+\[\]""".r.findFirstIn(outputs(7)).nonEmpty)
+    } else {
+      assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(7)).nonEmpty)
+    }
+  }
+
+  @ClusterTests(
+    Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1),
+          new ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1)))
+  def testOnlyOneBrokerAndOneController(): Unit = {
+    val statusOutput = TestUtils.grabConsoleOutput(
+      MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
+    )
+    assertEquals("MaxFollowerLag:         0", statusOutput.split("\n")(4))
+    assertEquals("MaxFollowerLagTimeMs:   0", statusOutput.split("\n")(5))
+
+    val replicationOutput = TestUtils.grabConsoleOutput(
+      MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
+    )
+    assertEquals("0", replicationOutput.split("\n")(1).split("\\s+")(2))
+  }
+
+  @ClusterTest(clusterType = Type.ZK, brokers = 3)
+  def testDescribeQuorumInZkMode(): Unit = {
+    assertTrue(
+      assertThrows(
+        classOf[ExecutionException],
+        () =>
+          MetadataQuorumCommand.mainNoExit(
+            Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
+      ).getCause.isInstanceOf[UnsupportedVersionException]
+    )
+    assertTrue(
+      assertThrows(
+        classOf[ExecutionException],
+        () =>
+          MetadataQuorumCommand.mainNoExit(
+            Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
+      ).getCause.isInstanceOf[UnsupportedVersionException]
+    )
+  }
+}
+
+class MetadataQuorumCommandErrorTest {
+
+  @Test
+  def testPropertiesFileDoesNotExists(): Unit = {
+    assertEquals(1,
+                 MetadataQuorumCommand.mainNoExit(
+                   Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")))
+    assertEquals(
+      "Properties file admin.properties does not exists!",
+      TestUtils
+        .grabConsoleError(
+          MetadataQuorumCommand.mainNoExit(
+            Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")))
+        .trim
+    )
+  }
+
+  @Test
+  def testDescribeOptions(): Unit = {
+    assertEquals(1, MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe")))
+    assertEquals(
+      "One of --status or --replication must be specified with describe sub-command",
+      TestUtils
+        .grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe")))
+        .trim
+    )
+
+    assertEquals(1,
+                 MetadataQuorumCommand.mainNoExit(
+                   Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")))
+    assertEquals(
+      "Only one of --status or --replication should be specified with describe sub-command",
+      TestUtils
+        .grabConsoleError(
+          MetadataQuorumCommand.mainNoExit(
+            Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")))
+        .trim
+    )
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index eed58961e4..28a6f80123 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -74,6 +74,8 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
       val leaderId = partitionData.leaderId
       assertTrue(leaderId > 0)
+      assertTrue(partitionData.leaderEpoch() > 0)
+      assertTrue(partitionData.highWatermark() > 0)
 
       val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
         .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java
similarity index 61%
rename from tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
rename to server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java
index 3a80b5811f..0c923cd66c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java
@@ -14,13 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.tools;
+package org.apache.kafka.server.util;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 public class ToolsUtils {
 
@@ -52,4 +56,49 @@ public class ToolsUtils {
             }
         }
     }
+
+    private static void appendColumnValue(
+        StringBuilder rowBuilder,
+        String value,
+        int length
+    ) {
+        int padLength = length - value.length();
+        rowBuilder.append(value);
+        for (int i = 0; i < padLength; i++)
+            rowBuilder.append(' ');
+    }
+
+    private static void printRow(
+        List<Integer> columnLengths,
+        String[] row,
+        PrintStream out
+    ) {
+        StringBuilder rowBuilder = new StringBuilder();
+        for (int i = 0; i < row.length; i++) {
+            Integer columnLength = columnLengths.get(i);
+            String columnValue = row[i];
+            appendColumnValue(rowBuilder, columnValue, columnLength);
+            rowBuilder.append('\t');
+        }
+        out.println(rowBuilder);
+    }
+
+    public static void prettyPrintTable(
+        String[] headers,
+        List<String[]> rows,
+        PrintStream out
+    ) {
+        List<Integer> columnLengths = Arrays.stream(headers)
+            .map(String::length)
+            .collect(Collectors.toList());
+
+        for (String[] row : rows) {
+            for (int i = 0; i < headers.length; i++) {
+                columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length()));
+            }
+        }
+
+        printRow(columnLengths, headers, out);
+        rows.forEach(row -> printRow(columnLengths, row, out));
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 6967a16fa6..f2ee53cb3f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -43,6 +43,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.ToolsUtils;
 
 public class ProducerPerformance {
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index 92e713ac3b..194524d265 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -66,6 +66,7 @@ import java.util.stream.Collectors;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static org.apache.kafka.server.util.ToolsUtils.prettyPrintTable;
 
 public abstract class TransactionsCommand {
     private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class);
@@ -903,51 +904,6 @@ public abstract class TransactionsCommand {
         }
     }
 
-    private static void appendColumnValue(
-        StringBuilder rowBuilder,
-        String value,
-        int length
-    ) {
-        int padLength = length - value.length();
-        rowBuilder.append(value);
-        for (int i = 0; i < padLength; i++)
-            rowBuilder.append(' ');
-    }
-
-    private static void printRow(
-        List<Integer> columnLengths,
-        String[] row,
-        PrintStream out
-    ) {
-        StringBuilder rowBuilder = new StringBuilder();
-        for (int i = 0; i < row.length; i++) {
-            Integer columnLength = columnLengths.get(i);
-            String columnValue = row[i];
-            appendColumnValue(rowBuilder, columnValue, columnLength);
-            rowBuilder.append('\t');
-        }
-        out.println(rowBuilder);
-    }
-
-    private static void prettyPrintTable(
-        String[] headers,
-        List<String[]> rows,
-        PrintStream out
-    ) {
-        List<Integer> columnLengths = Arrays.stream(headers)
-            .map(String::length)
-            .collect(Collectors.toList());
-
-        for (String[] row : rows) {
-            for (int i = 0; i < headers.length; i++) {
-                columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length()));
-            }
-        }
-
-        printRow(columnLengths, headers, out);
-        rows.forEach(row -> printRow(columnLengths, row, out));
-    }
-
     private static void printErrorAndExit(String message, Throwable t) {
         log.debug(message, t);