You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2023/05/04 10:00:42 UTC
[kafka] branch trunk updated: KAFKA-14594: Move LogDirsCommand to tools module (#13122)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc7819d7f1f KAFKA-14594: Move LogDirsCommand to tools module (#13122)
dc7819d7f1f is described below
commit dc7819d7f1fe6b0160cd95246420ab10c335410b
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Thu May 4 11:00:33 2023 +0100
KAFKA-14594: Move LogDirsCommand to tools module (#13122)
Reviewers: Mickael Maison <mi...@gmail.com>
---
bin/kafka-log-dirs.sh | 2 +-
bin/windows/kafka-log-dirs.bat | 2 +-
.../kafka/clients/admin/MockAdminClient.java | 32 ++-
.../main/scala/kafka/admin/LogDirsCommand.scala | 132 -------------
.../unit/kafka/admin/LogDirsCommandTest.scala | 76 -------
.../org/apache/kafka/tools/LogDirsCommand.java | 220 +++++++++++++++++++++
.../org/apache/kafka/tools/LogDirsCommandTest.java | 114 +++++++++++
7 files changed, 367 insertions(+), 211 deletions(-)
diff --git a/bin/kafka-log-dirs.sh b/bin/kafka-log-dirs.sh
index dc16edcc7c5..9894d695cc4 100755
--- a/bin/kafka-log-dirs.sh
+++ b/bin/kafka-log-dirs.sh
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.LogDirsCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.LogDirsCommand "$@"
diff --git a/bin/windows/kafka-log-dirs.bat b/bin/windows/kafka-log-dirs.bat
index b490d47feae..850003c6038 100644
--- a/bin/windows/kafka-log-dirs.bat
+++ b/bin/windows/kafka-log-dirs.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
-"%~dp0kafka-run-class.bat" kafka.admin.LogDirsCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.LogDirsCommand %*
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 7a32bcf5a16..7da22cc787d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -945,7 +945,37 @@ public class MockAdminClient extends AdminClient {
@Override
synchronized public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers,
DescribeLogDirsOptions options) {
- throw new UnsupportedOperationException("Not implemented yet");
+ Map<Integer, Map<String, LogDirDescription>> unwrappedResults = new HashMap<>();
+
+ for (Integer broker : brokers) {
+ unwrappedResults.putIfAbsent(broker, new HashMap<>());
+ }
+
+ for (Map.Entry<String, TopicMetadata> entry : allTopics.entrySet()) {
+ String topicName = entry.getKey();
+ TopicMetadata topicMetadata = entry.getValue();
+ // For tests, we make the assumption that there will always be only 1 entry.
+ List<String> partitionLogDirs = topicMetadata.partitionLogDirs;
+ List<TopicPartitionInfo> topicPartitionInfos = topicMetadata.partitions;
+ for (TopicPartitionInfo topicPartitionInfo : topicPartitionInfos) {
+ List<Node> nodes = topicPartitionInfo.replicas();
+ for (Node node : nodes) {
+ Map<String, LogDirDescription> logDirDescriptionMap = unwrappedResults.get(node.id());
+ LogDirDescription logDirDescription = logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new LogDirDescription(null, new HashMap<>()));
+ logDirDescription.replicaInfos().put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false));
+ }
+ }
+ }
+
+ Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> results = new HashMap<>();
+
+ for (Map.Entry<Integer, Map<String, LogDirDescription>> entry : unwrappedResults.entrySet()) {
+ KafkaFutureImpl<Map<String, LogDirDescription>> kafkaFuture = new KafkaFutureImpl<>();
+ kafkaFuture.complete(entry.getValue());
+ results.put(entry.getKey(), kafkaFuture);
+ }
+
+ return new DescribeLogDirsResult(results);
}
@Override
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
deleted file mode 100644
index 870e6a17ba1..00000000000
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-import java.io.PrintStream
-import java.util.Properties
-import kafka.utils.Json
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, LogDirDescription}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-
-import scala.jdk.CollectionConverters._
-import scala.collection.Map
-
-/**
- * A command for querying log directory usage on the specified brokers
- */
-object LogDirsCommand {
-
- def main(args: Array[String]): Unit = {
- describe(args, System.out)
- }
-
- def describe(args: Array[String], out: PrintStream): Unit = {
- val opts = new LogDirsCommandOptions(args)
- val adminClient = createAdminClient(opts)
- try {
- val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
- val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
- val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
- case Some(brokerListStr) =>
- val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
- (inputBrokers.intersect(clusterBrokers), inputBrokers.diff(clusterBrokers))
- case None => (clusterBrokers, Set.empty)
- }
-
- if (nonExistingBrokers.nonEmpty) {
- out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}." +
- s" Current existent brokers: ${clusterBrokers.mkString(",")}")
- } else {
- out.println("Querying brokers for log directories information")
- val describeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)
- val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
-
- out.println(s"Received log directory information from brokers ${existingBrokers.mkString(",")}")
- out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
- }
- } finally {
- adminClient.close()
- }
- }
-
- private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirDescription]], topicSet: Set[String]): String = {
- Json.encodeAsString(Map(
- "version" -> 1,
- "brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) =>
- Map(
- "broker" -> broker,
- "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
- Map(
- "logDir" -> logDir,
- "error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull,
- "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) =>
- topicSet.isEmpty || topicSet.contains(topicPartition.topic)
- }.map { case (topicPartition, replicaInfo) =>
- Map(
- "partition" -> topicPartition.toString,
- "size" -> replicaInfo.size,
- "offsetLag" -> replicaInfo.offsetLag,
- "isFuture" -> replicaInfo.isFuture
- ).asJava
- }.asJava
- ).asJava
- }.asJava
- ).asJava
- }.asJava
- ).asJava)
- }
-
- private def createAdminClient(opts: LogDirsCommandOptions): Admin = {
- val props = if (opts.options.has(opts.commandConfigOpt))
- Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
- else
- new Properties()
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
- props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
- Admin.create(props)
- }
-
- class LogDirsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args){
- val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
- .withRequiredArg
- .describedAs("The server(s) to use for bootstrapping")
- .ofType(classOf[String])
- val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
- .withRequiredArg
- .describedAs("Admin client property file")
- .ofType(classOf[String])
- val describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.")
- val topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " +
- "All topics will be queried if no topic list is specified")
- .withRequiredArg
- .describedAs("Topic list")
- .defaultsTo("")
- .ofType(classOf[String])
- val brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " +
- "All brokers in the cluster will be queried if no broker list is specified")
- .withRequiredArg
- .describedAs("Broker list")
- .ofType(classOf[String])
-
- options = parser.parse(args : _*)
-
- CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.")
-
- CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt)
- }
-}
diff --git a/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
deleted file mode 100644
index 03e9c1785ac..00000000000
--- a/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import java.io.{ByteArrayOutputStream, PrintStream}
-import java.nio.charset.StandardCharsets
-
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
-
-import scala.collection.Seq
-
-class LogDirsCommandTest extends KafkaServerTestHarness {
-
- def generateConfigs: Seq[KafkaConfig] = {
- TestUtils.createBrokerConfigs(1, zkConnect)
- .map(KafkaConfig.fromProps)
- }
-
- @Test
- def checkLogDirsCommandOutput(): Unit = {
- val byteArrayOutputStream = new ByteArrayOutputStream
- val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
- //input exist brokerList
- LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0", "--describe"), printStream)
- val existingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
- val existingBrokersLineIter = existingBrokersContent.split("\n").iterator
-
- assertTrue(existingBrokersLineIter.hasNext)
- assertTrue(existingBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
-
- //input nonexistent brokerList
- byteArrayOutputStream.reset()
- LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,1,2", "--describe"), printStream)
- val nonExistingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
- val nonExistingBrokersLineIter = nonExistingBrokersContent.split("\n").iterator
-
- assertTrue(nonExistingBrokersLineIter.hasNext)
- assertTrue(nonExistingBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0"))
-
- //input duplicate ids
- byteArrayOutputStream.reset()
- LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,0,1,2,2", "--describe"), printStream)
- val duplicateBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
- val duplicateBrokersLineIter = duplicateBrokersContent.split("\n").iterator
-
- assertTrue(duplicateBrokersLineIter.hasNext)
- assertTrue(duplicateBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0"))
-
- //use all brokerList for current cluster
- byteArrayOutputStream.reset()
- LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--describe"), printStream)
- val allBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
- val allBrokersLineIter = allBrokersContent.split("\n").iterator
-
- assertTrue(allBrokersLineIter.hasNext)
- assertTrue(allBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
- }
-}
diff --git a/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java b/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java
new file mode 100644
index 00000000000..4e6f0d3397a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class LogDirsCommand {
+
+ public static void main(String... args) {
+ Exit.exit(mainNoExit(args));
+ }
+
+ static int mainNoExit(String... args) {
+ try {
+ execute(args);
+ return 0;
+ } catch (TerseException e) {
+ System.err.println(e.getMessage());
+ return 1;
+ } catch (Throwable e) {
+ System.err.println(e.getMessage());
+ System.err.println(Utils.stackTrace(e));
+ return 1;
+ }
+ }
+
+ private static void execute(String... args) throws Exception {
+ LogDirsCommandOptions options = new LogDirsCommandOptions(args);
+ try (Admin adminClient = createAdminClient(options)) {
+ execute(options, adminClient);
+ }
+ }
+
+ static void execute(LogDirsCommandOptions options, Admin adminClient) throws Exception {
+ Set<String> topics = options.topics();
+ Set<Integer> clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
+ Set<Integer> inputBrokers = options.brokers();
+ Set<Integer> existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers) : new HashSet<>(inputBrokers);
+ existingBrokers.retainAll(clusterBrokers);
+ Set<Integer> nonExistingBrokers = new HashSet<>(inputBrokers);
+ nonExistingBrokers.removeAll(clusterBrokers);
+
+ if (!nonExistingBrokers.isEmpty()) {
+ throw new TerseException(
+ String.format(
+ "ERROR: The given brokers do not exist from --broker-list: %s. Current existent brokers: %s",
+ commaDelimitedStringFromIntegerSet(nonExistingBrokers),
+ commaDelimitedStringFromIntegerSet(clusterBrokers)));
+ } else {
+ System.out.println("Querying brokers for log directories information");
+ DescribeLogDirsResult describeLogDirsResult = adminClient.describeLogDirs(existingBrokers);
+ Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker = describeLogDirsResult.allDescriptions().get();
+
+ System.out.printf(
+ "Received log directory information from brokers %s%n",
+ commaDelimitedStringFromIntegerSet(existingBrokers));
+ System.out.println(formatAsJson(logDirInfosByBroker, topics));
+ }
+ }
+
+ private static String commaDelimitedStringFromIntegerSet(Set<Integer> set) {
+ return set.stream().map(String::valueOf).collect(Collectors.joining(","));
+ }
+
+ private static List<Map<String, Object>> fromReplicasInfoToPrintableRepresentation(Map<TopicPartition, ReplicaInfo> replicasInfo) {
+ return replicasInfo.entrySet().stream().map(entry -> {
+ TopicPartition topicPartition = entry.getKey();
+ return new HashMap<String, Object>() {{
+ put("partition", topicPartition.toString());
+ put("size", entry.getValue().size());
+ put("offsetLag", entry.getValue().offsetLag());
+ put("isFuture", entry.getValue().isFuture());
+ }};
+ }).collect(Collectors.toList());
+ }
+
+ private static List<Map<String, Object>> fromLogDirInfosToPrintableRepresentation(Map<String, LogDirDescription> logDirInfos, Set<String> topicSet) {
+ return logDirInfos.entrySet().stream().map(entry -> {
+ String logDir = entry.getKey();
+ return new HashMap<String, Object>() {{
+ put("logDir", logDir);
+ put("error", entry.getValue().error() != null ? entry.getValue().error().getClass().getName() : null);
+ put("partitions", fromReplicasInfoToPrintableRepresentation(
+ entry.getValue().replicaInfos().entrySet().stream().filter(entry -> {
+ TopicPartition topicPartition = entry.getKey();
+ return topicSet.isEmpty() || topicSet.contains(topicPartition.topic());
+ }).collect(Collectors.toMap(Entry::getKey, Entry::getValue))
+ ));
+ }};
+ }).collect(Collectors.toList());
+ }
+
+ private static String formatAsJson(Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker, Set<String> topicSet) throws JsonProcessingException {
+ return new ObjectMapper().writeValueAsString(new HashMap<String, Object>() {{
+ put("version", 1);
+ put("brokers", logDirInfosByBroker.entrySet().stream().map(entry -> {
+ int broker = entry.getKey();
+ Map<String, LogDirDescription> logDirInfos = entry.getValue();
+ return new HashMap<String, Object>() {{
+ put("broker", broker);
+ put("logDirs", fromLogDirInfosToPrintableRepresentation(logDirInfos, topicSet));
+ }};
+ }).collect(Collectors.toList()));
+ }});
+ }
+
+ private static Admin createAdminClient(LogDirsCommandOptions options) throws IOException {
+ Properties props = new Properties();
+ if (options.hasCommandConfig()) {
+ props.putAll(Utils.loadProps(options.commandConfig()));
+ }
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, options.bootstrapServers());
+ props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool");
+ return Admin.create(props);
+ }
+
+ // Visible for testing
+ static class LogDirsCommandOptions extends CommandDefaultOptions {
+ private final OptionSpec<String> bootstrapServerOpt;
+ private final OptionSpec<String> commandConfigOpt;
+ private final OptionSpecBuilder describeOpt;
+ private final OptionSpec<String> topicListOpt;
+ private final OptionSpec<String> brokerListOpt;
+
+ public LogDirsCommandOptions(String... args) {
+ super(args);
+
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
+ .withRequiredArg()
+ .describedAs("The server(s) to use for bootstrapping")
+ .ofType(String.class);
+ commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+ .withRequiredArg()
+ .describedAs("Admin client property file")
+ .ofType(String.class);
+ describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.");
+ topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " +
+ "All topics will be queried if no topic list is specified")
+ .withRequiredArg()
+ .describedAs("Topic list")
+ .defaultsTo("")
+ .ofType(String.class);
+ brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " +
+ "All brokers in the cluster will be queried if no broker list is specified")
+ .withRequiredArg()
+ .describedAs("Broker list")
+ .ofType(String.class)
+ .defaultsTo("");
+
+ options = parser.parse(args);
+
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.");
+
+ CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt);
+ }
+
+ private Stream<String> splitAtCommasAndFilterOutEmpty(OptionSpec<String> option) {
+ return Arrays.stream(options.valueOf(option).split(",")).filter(x -> !x.isEmpty());
+ }
+
+ private String bootstrapServers() {
+ return options.valueOf(bootstrapServerOpt);
+ }
+
+ private boolean hasCommandConfig() {
+ return options.has(commandConfigOpt);
+ }
+
+ private String commandConfig() {
+ return options.valueOf(commandConfigOpt);
+ }
+
+ private Set<String> topics() {
+ return splitAtCommasAndFilterOutEmpty(topicListOpt).collect(Collectors.toSet());
+ }
+
+ private Set<Integer> brokers() {
+ return splitAtCommasAndFilterOutEmpty(brokerListOpt).map(Integer::valueOf).collect(Collectors.toSet());
+ }
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
new file mode 100644
index 00000000000..4dfd36896c6
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+ @Test
+ public void shouldThrowWhenQueryingNonExistentBrokers() {
+ Node broker = new Node(1, "hostname", 9092);
+ try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+ assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException {
+ Node broker = new Node(1, "hostname", 9092);
+ try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+ String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient);
+ String[] standardOutputLines = standardOutput.split("\n");
+ assertEquals(3, standardOutputLines.length);
+ Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+ List<Object> brokersInformation = (List<Object>) information.get("brokers");
+ Integer brokerId = (Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker");
+ assertEquals(1, brokersInformation.size());
+ assertEquals(1, brokerId);
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldQueryAllBrokersIfNonSpecified() throws JsonProcessingException {
+ Node brokerOne = new Node(1, "hostname", 9092);
+ Node brokerTwo = new Node(2, "hostname", 9092);
+ try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerTwo, brokerOne), brokerOne)) {
+ String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
+ String[] standardOutputLines = standardOutput.split("\n");
+ assertEquals(3, standardOutputLines.length);
+ Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+ List<Object> brokersInformation = (List<Object>) information.get("brokers");
+ Set<Integer> brokerIds = new HashSet<Integer>() {{
+ add((Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker"));
+ add((Integer) ((HashMap<String, Object>) brokersInformation.get(1)).get("broker"));
+ }};
+ assertEquals(2, brokersInformation.size());
+ assertEquals(new HashSet<>(Arrays.asList(2, 1)), brokerIds);
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldQuerySpecifiedBroker() throws JsonProcessingException {
+ Node brokerOne = new Node(1, "hostname", 9092);
+ Node brokerTwo = new Node(2, "hostname", 9092);
+ try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerOne, brokerTwo), brokerOne)) {
+ String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1", "--describe"), adminClient);
+ String[] standardOutputLines = standardOutput.split("\n");
+ assertEquals(3, standardOutputLines.length);
+ Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+ List<Object> brokersInformation = (List<Object>) information.get("brokers");
+ Integer brokerId = (Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker");
+ assertEquals(1, brokersInformation.size());
+ assertEquals(1, brokerId);
+ }
+ }
+
+ private LogDirsCommand.LogDirsCommandOptions fromArgsToOptions(String... args) {
+ return new LogDirsCommand.LogDirsCommandOptions(args);
+ }
+
+ private String execute(LogDirsCommand.LogDirsCommandOptions options, Admin adminClient) {
+ Runnable runnable = () -> {
+ try {
+ LogDirsCommand.execute(options, adminClient);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ return ToolsTestUtils.captureStandardOut(runnable);
+ }
+}