You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/20 15:37:51 UTC
[kafka] branch trunk updated: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 150fd5b0b1 KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469)
150fd5b0b1 is described below
commit 150fd5b0b18c4761d8f7d7ba9a480aa9f622024f
Author: dengziming <de...@gmail.com>
AuthorDate: Sat Aug 20 23:37:26 2022 +0800
KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469)
Add `MetadataQuorumCommand` to describe quorum status, I'm trying to use arg4j style command format, currently, we only support one sub-command which is "describe" and we can specify 2 arguments which are --status and --replication.
```
# describe quorum status
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication
ReplicaId LogEndOffset Lag LastFetchTimeMs LastCaughtUpTimeMs Status
0 10 0 -1 -1 Leader
1 10 0 -1 -1 Follower
2 10 0 -1 -1 Follower
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
ClusterId: fMCL8kv1SWm87L_Md-I2hg
LeaderId: 3002
LeaderEpoch: 2
HighWatermark: 10
MaxFollowerLag: 0
MaxFollowerLagTimeMs: -1
CurrentVoters: [3000,3001,3002]
CurrentObservers: [0,1,2]
# specify AdminClient properties
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config config.properties describe --status
```
Reviewers: Jason Gustafson <ja...@confluent.io>
---
bin/kafka-metadata-quorum.sh | 17 ++
bin/windows/kafka-metatada-quorum.bat | 17 ++
build.gradle | 1 +
checkstyle/import-control.xml | 1 +
.../kafka/clients/admin/KafkaAdminClient.java | 2 +
.../org/apache/kafka/clients/admin/QuorumInfo.java | 14 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 2 +-
.../scala/kafka/admin/MetadataQuorumCommand.scala | 172 ++++++++++++++++++
.../test/junit/RaftClusterInvocationContext.java | 4 +
.../kafka/admin/MetadataQuorumCommandTest.scala | 192 +++++++++++++++++++++
.../kafka/server/DescribeQuorumRequestTest.scala | 2 +
.../org/apache/kafka/server/util}/ToolsUtils.java | 51 +++++-
.../apache/kafka/tools/ProducerPerformance.java | 1 +
.../apache/kafka/tools/TransactionsCommand.java | 46 +----
14 files changed, 474 insertions(+), 48 deletions(-)
diff --git a/bin/kafka-metadata-quorum.sh b/bin/kafka-metadata-quorum.sh
new file mode 100755
index 0000000000..24bedbded1
--- /dev/null
+++ b/bin/kafka-metadata-quorum.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@"
diff --git a/bin/windows/kafka-metatada-quorum.bat b/bin/windows/kafka-metatada-quorum.bat
new file mode 100644
index 0000000000..4ea8e3109f
--- /dev/null
+++ b/bin/windows/kafka-metatada-quorum.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+"%~dp0kafka-run-class.bat" kafka.admin.MetadataQuorumCommand %*
diff --git a/build.gradle b/build.gradle
index f17011ca4d..7c38d899a6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1705,6 +1705,7 @@ project(':tools') {
dependencies {
implementation project(':clients')
+ implementation project(':server-common')
implementation project(':log4j-appender')
implementation libs.argparse4j
implementation libs.jacksonDatabind
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4b07a26cba..d24d1e7e5e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -366,6 +366,7 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
+ <allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 41eb27a1dd..e5df779b61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4359,6 +4359,8 @@ public class KafkaAdminClient extends AdminClient {
private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
return new QuorumInfo(
partition.leaderId(),
+ partition.leaderEpoch(),
+ partition.highWatermark(),
partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()),
partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList()));
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
index 75476d77dc..3a0b6cf6f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
@@ -25,11 +25,15 @@ import java.util.OptionalLong;
*/
public class QuorumInfo {
private final Integer leaderId;
+ private final Integer leaderEpoch;
+ private final Long highWatermark;
private final List<ReplicaState> voters;
private final List<ReplicaState> observers;
- QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+ QuorumInfo(Integer leaderId, Integer leaderEpoch, Long highWatermark, List<ReplicaState> voters, List<ReplicaState> observers) {
this.leaderId = leaderId;
+ this.leaderEpoch = leaderEpoch;
+ this.highWatermark = highWatermark;
this.voters = voters;
this.observers = observers;
}
@@ -38,6 +42,14 @@ public class QuorumInfo {
return leaderId;
}
+ public Integer leaderEpoch() {
+ return leaderEpoch;
+ }
+
+ public Long highWatermark() {
+ return highWatermark;
+ }
+
public List<ReplicaState> voters() {
return voters;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 5faf53f075..193457655a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -608,7 +608,7 @@ public class KafkaAdminClientTest {
}
private static QuorumInfo defaultQuorumInfo(Boolean emptyOptionals) {
- return new QuorumInfo(1,
+ return new QuorumInfo(1, 1, 1L,
singletonList(new QuorumInfo.ReplicaState(1, 100,
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000),
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))),
diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala
new file mode 100644
index 0000000000..b6e4e1597b
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.tools.TerseFailure
+import kafka.utils.Exit
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue}
+import net.sourceforge.argparse4j.inf.Subparsers
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.admin.{Admin, QuorumInfo}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.ToolsUtils.prettyPrintTable
+
+import java.io.File
+import java.util.Properties
+import scala.jdk.CollectionConverters._
+
+/**
+ * A tool for describing quorum status
+ */
+object MetadataQuorumCommand {
+
+ def main(args: Array[String]): Unit = {
+ val res = mainNoExit(args)
+ Exit.exit(res)
+ }
+
+ def mainNoExit(args: Array[String]): Int = {
+ val parser = ArgumentParsers
+ .newArgumentParser("kafka-metadata-quorum")
+ .defaultHelp(true)
+ .description("This tool describes kraft metadata quorum status.")
+ parser
+ .addArgument("--bootstrap-server")
+ .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
+ .required(true)
+
+ parser
+ .addArgument("--command-config")
+ .`type`(fileType())
+ .help("Property file containing configs to be passed to Admin Client.")
+ val subparsers = parser.addSubparsers().dest("command")
+ addDescribeParser(subparsers)
+
+ var admin: Admin = null
+ try {
+ val namespace = parser.parseArgsOrFail(args)
+ val command = namespace.getString("command")
+
+ val commandConfig = namespace.get[File]("command_config")
+ val props = if (commandConfig != null) {
+ if (!commandConfig.exists()) {
+ throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!")
+ }
+ Utils.loadProps(commandConfig.getPath)
+ } else {
+ new Properties()
+ }
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"))
+ admin = Admin.create(props)
+
+ if (command == "describe") {
+ if (namespace.getBoolean("status") && namespace.getBoolean("replication")) {
+ throw new TerseFailure(s"Only one of --status or --replication should be specified with describe sub-command")
+ } else if (namespace.getBoolean("replication")) {
+ handleDescribeReplication(admin)
+ } else if (namespace.getBoolean("status")) {
+ handleDescribeStatus(admin)
+ } else {
+ throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command")
+ }
+ } else {
+ throw new IllegalStateException(s"Unknown command: $command, only 'describe' is supported")
+ }
+ 0
+ } catch {
+ case e: TerseFailure =>
+ Console.err.println(e.getMessage)
+ 1
+ } finally {
+ if (admin != null) {
+ admin.close()
+ }
+ }
+ }
+
+ def addDescribeParser(subparsers: Subparsers): Unit = {
+ val describeParser = subparsers
+ .addParser("describe")
+ .help("Describe the metadata quorum info")
+
+ val statusArgs = describeParser.addArgumentGroup("Status")
+ statusArgs
+ .addArgument("--status")
+ .help(
+ "A short summary of the quorum status and the other provides detailed information about the status of replication.")
+ .action(storeTrue())
+ val replicationArgs = describeParser.addArgumentGroup("Replication")
+ replicationArgs
+ .addArgument("--replication")
+ .help("Detailed information about the status of replication")
+ .action(storeTrue())
+ }
+
+ private def handleDescribeReplication(admin: Admin): Unit = {
+ val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get
+ val leaderId = quorumInfo.leaderId
+ val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head
+
+ def convertQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Seq[Array[String]] =
+ infos.map { info =>
+ Array(info.replicaId,
+ info.logEndOffset,
+ leader.logEndOffset - info.logEndOffset,
+ info.lastFetchTimeMs.orElse(-1),
+ info.lastCaughtUpTimeMs.orElse(-1),
+ status
+ ).map(_.toString)
+ }
+ prettyPrintTable(
+ Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status"),
+ (convertQuorumInfo(Seq(leader), "Leader")
+ ++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower")
+ ++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava,
+ scala.Console.out
+ )
+ }
+
+ private def handleDescribeStatus(admin: Admin): Unit = {
+ val clusterId = admin.describeCluster.clusterId.get
+ val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get
+ val leaderId = quorumInfo.leaderId
+ val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head
+ val maxLagFollower = quorumInfo.voters.asScala
+ .minBy(_.logEndOffset)
+ val maxFollowerLag = leader.logEndOffset - maxLagFollower.logEndOffset
+ val maxFollowerLagTimeMs =
+ if (leader == maxLagFollower) {
+ 0
+ } else if (leader.lastCaughtUpTimeMs.isPresent && maxLagFollower.lastCaughtUpTimeMs.isPresent) {
+ leader.lastCaughtUpTimeMs.getAsLong - maxLagFollower.lastCaughtUpTimeMs.getAsLong
+ } else {
+ -1
+ }
+ println(
+ s"""|ClusterId: $clusterId
+ |LeaderId: ${quorumInfo.leaderId}
+ |LeaderEpoch: ${quorumInfo.leaderEpoch}
+ |HighWatermark: ${quorumInfo.highWatermark}
+ |MaxFollowerLag: $maxFollowerLag
+ |MaxFollowerLagTimeMs: $maxFollowerLagTimeMs
+ |CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
+ |CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
+ |""".stripMargin
+ )
+ }
+}
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 40669f3068..f5c281ff24 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -183,6 +183,10 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
));
}
+ public Collection<ControllerServer> controllerServers() {
+ return controllers().collect(Collectors.toList());
+ }
+
@Override
public ClusterType clusterType() {
return ClusterType.RAFT;
diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala
new file mode 100644
index 0000000000..24b6616cb1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util.concurrent.ExecutionException
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+class MetadataQuorumCommandTest(cluster: ClusterInstance) {
+
+ /**
+ * 1. The same number of broker controllers
+ * 2. More brokers than controllers
+ * 3. Fewer brokers than controllers
+ */
+ @ClusterTests(
+ Array(
+ new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
+ new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
+ new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
+ ))
+ def testDescribeQuorumReplicationSuccessful(): Unit = {
+ cluster.waitForReadyBrokers()
+ val describeOutput = TestUtils.grabConsoleOutput(
+ MetadataQuorumCommand.mainNoExit(
+ Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
+ )
+
+ val leaderPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Leader\s+""".r
+ val followerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Follower\s+""".r
+ val observerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Observer\s+""".r
+ val outputs = describeOutput.split("\n").tail
+ if (cluster.config().clusterType() == Type.CO_KRAFT) {
+ assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.length)
+ } else {
+ assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length)
+ }
+ // `matches` is not supported in scala 2.12, use `findFirstIn` instead.
+ assertTrue(leaderPattern.findFirstIn(outputs.head).nonEmpty)
+ assertEquals(1, outputs.count(leaderPattern.findFirstIn(_).nonEmpty))
+ assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.findFirstIn(_).nonEmpty))
+
+ if (cluster.config().clusterType() == Type.CO_KRAFT) {
+ assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.findFirstIn(_).nonEmpty))
+ } else {
+ assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.findFirstIn(_).nonEmpty))
+ }
+ }
+
+ /**
+ * 1. The same number of broker controllers
+ * 2. More brokers than controllers
+ * 3. Fewer brokers than controllers
+ */
+ @ClusterTests(
+ Array(
+ new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
+ new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
+ new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
+ ))
+ def testDescribeQuorumStatusSuccessful(): Unit = {
+ cluster.waitForReadyBrokers()
+ val describeOutput = TestUtils.grabConsoleOutput(
+ MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
+ )
+ val outputs = describeOutput.split("\n")
+
+ assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty)
+ assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty)
+ assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty)
+ // HighWatermark may be -1
+ assertTrue("""HighWatermark:\s+[-]?\d+""".r.findFirstIn(outputs(3)).nonEmpty)
+ assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty)
+ assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty)
+ assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(6)).nonEmpty)
+
+ // There are no observers if we have fewer brokers than controllers
+ if (cluster.config().clusterType() == Type.CO_KRAFT
+ && cluster.config().numBrokers() <= cluster.config().numControllers()) {
+ assertTrue("""CurrentObservers:\s+\[\]""".r.findFirstIn(outputs(7)).nonEmpty)
+ } else {
+ assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(7)).nonEmpty)
+ }
+ }
+
+ @ClusterTests(
+ Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1),
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1)))
+ def testOnlyOneBrokerAndOneController(): Unit = {
+ val statusOutput = TestUtils.grabConsoleOutput(
+ MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
+ )
+ assertEquals("MaxFollowerLag: 0", statusOutput.split("\n")(4))
+ assertEquals("MaxFollowerLagTimeMs: 0", statusOutput.split("\n")(5))
+
+ val replicationOutput = TestUtils.grabConsoleOutput(
+ MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
+ )
+ assertEquals("0", replicationOutput.split("\n")(1).split("\\s+")(2))
+ }
+
+ @ClusterTest(clusterType = Type.ZK, brokers = 3)
+ def testDescribeQuorumInZkMode(): Unit = {
+ assertTrue(
+ assertThrows(
+ classOf[ExecutionException],
+ () =>
+ MetadataQuorumCommand.mainNoExit(
+ Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
+ ).getCause.isInstanceOf[UnsupportedVersionException]
+ )
+ assertTrue(
+ assertThrows(
+ classOf[ExecutionException],
+ () =>
+ MetadataQuorumCommand.mainNoExit(
+ Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
+ ).getCause.isInstanceOf[UnsupportedVersionException]
+ )
+ }
+}
+
+class MetadataQuorumCommandErrorTest {
+
+ @Test
+ def testPropertiesFileDoesNotExists(): Unit = {
+ assertEquals(1,
+ MetadataQuorumCommand.mainNoExit(
+ Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")))
+ assertEquals(
+ "Properties file admin.properties does not exists!",
+ TestUtils
+ .grabConsoleError(
+ MetadataQuorumCommand.mainNoExit(
+ Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")))
+ .trim
+ )
+ }
+
+ @Test
+ def testDescribeOptions(): Unit = {
+ assertEquals(1, MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe")))
+ assertEquals(
+ "One of --status or --replication must be specified with describe sub-command",
+ TestUtils
+ .grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe")))
+ .trim
+ )
+
+ assertEquals(1,
+ MetadataQuorumCommand.mainNoExit(
+ Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")))
+ assertEquals(
+ "Only one of --status or --replication should be specified with describe sub-command",
+ TestUtils
+ .grabConsoleError(
+ MetadataQuorumCommand.mainNoExit(
+ Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")))
+ .trim
+ )
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index eed58961e4..28a6f80123 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -74,6 +74,8 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
val leaderId = partitionData.leaderId
assertTrue(leaderId > 0)
+ assertTrue(partitionData.leaderEpoch() > 0)
+ assertTrue(partitionData.highWatermark() > 0)
val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
.getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java
similarity index 61%
rename from tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
rename to server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java
index 3a80b5811f..0c923cd66c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java
@@ -14,13 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.tools;
+package org.apache.kafka.server.util;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.stream.Collectors;
public class ToolsUtils {
@@ -52,4 +56,49 @@ public class ToolsUtils {
}
}
}
+
+ private static void appendColumnValue(
+ StringBuilder rowBuilder,
+ String value,
+ int length
+ ) {
+ int padLength = length - value.length();
+ rowBuilder.append(value);
+ for (int i = 0; i < padLength; i++)
+ rowBuilder.append(' ');
+ }
+
+ private static void printRow(
+ List<Integer> columnLengths,
+ String[] row,
+ PrintStream out
+ ) {
+ StringBuilder rowBuilder = new StringBuilder();
+ for (int i = 0; i < row.length; i++) {
+ Integer columnLength = columnLengths.get(i);
+ String columnValue = row[i];
+ appendColumnValue(rowBuilder, columnValue, columnLength);
+ rowBuilder.append('\t');
+ }
+ out.println(rowBuilder);
+ }
+
+ public static void prettyPrintTable(
+ String[] headers,
+ List<String[]> rows,
+ PrintStream out
+ ) {
+ List<Integer> columnLengths = Arrays.stream(headers)
+ .map(String::length)
+ .collect(Collectors.toList());
+
+ for (String[] row : rows) {
+ for (int i = 0; i < headers.length; i++) {
+ columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length()));
+ }
+ }
+
+ printRow(columnLengths, headers, out);
+ rows.forEach(row -> printRow(columnLengths, row, out));
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 6967a16fa6..f2ee53cb3f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -43,6 +43,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.ToolsUtils;
public class ProducerPerformance {
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index 92e713ac3b..194524d265 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -66,6 +66,7 @@ import java.util.stream.Collectors;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static org.apache.kafka.server.util.ToolsUtils.prettyPrintTable;
public abstract class TransactionsCommand {
private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class);
@@ -903,51 +904,6 @@ public abstract class TransactionsCommand {
}
}
- private static void appendColumnValue(
- StringBuilder rowBuilder,
- String value,
- int length
- ) {
- int padLength = length - value.length();
- rowBuilder.append(value);
- for (int i = 0; i < padLength; i++)
- rowBuilder.append(' ');
- }
-
- private static void printRow(
- List<Integer> columnLengths,
- String[] row,
- PrintStream out
- ) {
- StringBuilder rowBuilder = new StringBuilder();
- for (int i = 0; i < row.length; i++) {
- Integer columnLength = columnLengths.get(i);
- String columnValue = row[i];
- appendColumnValue(rowBuilder, columnValue, columnLength);
- rowBuilder.append('\t');
- }
- out.println(rowBuilder);
- }
-
- private static void prettyPrintTable(
- String[] headers,
- List<String[]> rows,
- PrintStream out
- ) {
- List<Integer> columnLengths = Arrays.stream(headers)
- .map(String::length)
- .collect(Collectors.toList());
-
- for (String[] row : rows) {
- for (int i = 0; i < headers.length; i++) {
- columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length()));
- }
- }
-
- printRow(columnLengths, headers, out);
- rows.forEach(row -> printRow(columnLengths, row, out));
- }
-
private static void printErrorAndExit(String message, Throwable t) {
log.debug(message, t);