You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2023/05/04 10:00:42 UTC

[kafka] branch trunk updated: KAFKA-14594: Move LogDirsCommand to tools module (#13122)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc7819d7f1f KAFKA-14594: Move LogDirsCommand to tools module (#13122)
dc7819d7f1f is described below

commit dc7819d7f1fe6b0160cd95246420ab10c335410b
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Thu May 4 11:00:33 2023 +0100

    KAFKA-14594: Move LogDirsCommand to tools module (#13122)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 bin/kafka-log-dirs.sh                              |   2 +-
 bin/windows/kafka-log-dirs.bat                     |   2 +-
 .../kafka/clients/admin/MockAdminClient.java       |  32 ++-
 .../main/scala/kafka/admin/LogDirsCommand.scala    | 132 -------------
 .../unit/kafka/admin/LogDirsCommandTest.scala      |  76 -------
 .../org/apache/kafka/tools/LogDirsCommand.java     | 220 +++++++++++++++++++++
 .../org/apache/kafka/tools/LogDirsCommandTest.java | 114 +++++++++++
 7 files changed, 367 insertions(+), 211 deletions(-)

diff --git a/bin/kafka-log-dirs.sh b/bin/kafka-log-dirs.sh
index dc16edcc7c5..9894d695cc4 100755
--- a/bin/kafka-log-dirs.sh
+++ b/bin/kafka-log-dirs.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.LogDirsCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.LogDirsCommand "$@"
diff --git a/bin/windows/kafka-log-dirs.bat b/bin/windows/kafka-log-dirs.bat
index b490d47feae..850003c6038 100644
--- a/bin/windows/kafka-log-dirs.bat
+++ b/bin/windows/kafka-log-dirs.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 rem See the License for the specific language governing permissions and
 rem limitations under the License.
 
-"%~dp0kafka-run-class.bat" kafka.admin.LogDirsCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.LogDirsCommand %*
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 7a32bcf5a16..7da22cc787d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -945,7 +945,37 @@ public class MockAdminClient extends AdminClient {
     @Override
     synchronized public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers,
                                                               DescribeLogDirsOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        Map<Integer, Map<String, LogDirDescription>> unwrappedResults = new HashMap<>();
+
+        for (Integer broker : brokers) {
+            unwrappedResults.putIfAbsent(broker, new HashMap<>());
+        }
+
+        for (Map.Entry<String, TopicMetadata> entry : allTopics.entrySet()) {
+            String topicName = entry.getKey();
+            TopicMetadata topicMetadata = entry.getValue();
+            // For tests, we make the assumption that there will always be only 1 entry.
+            List<String> partitionLogDirs = topicMetadata.partitionLogDirs;
+            List<TopicPartitionInfo> topicPartitionInfos = topicMetadata.partitions;
+            for (TopicPartitionInfo topicPartitionInfo : topicPartitionInfos) {
+                List<Node> nodes = topicPartitionInfo.replicas();
+                for (Node node : nodes) {
+                    Map<String, LogDirDescription> logDirDescriptionMap = unwrappedResults.get(node.id());
+                    LogDirDescription logDirDescription = logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new LogDirDescription(null, new HashMap<>()));
+                    logDirDescription.replicaInfos().put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false));
+                }
+            }
+        }
+
+        Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> results = new HashMap<>();
+
+        for (Map.Entry<Integer, Map<String, LogDirDescription>> entry : unwrappedResults.entrySet()) {
+            KafkaFutureImpl<Map<String, LogDirDescription>> kafkaFuture = new KafkaFutureImpl<>();
+            kafkaFuture.complete(entry.getValue());
+            results.put(entry.getKey(), kafkaFuture);
+        }
+
+        return new DescribeLogDirsResult(results);
     }
 
     @Override
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
deleted file mode 100644
index 870e6a17ba1..00000000000
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.admin
-import java.io.PrintStream
-import java.util.Properties
-import kafka.utils.Json
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, LogDirDescription}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-
-import scala.jdk.CollectionConverters._
-import scala.collection.Map
-
-/**
-  * A command for querying log directory usage on the specified brokers
-  */
-object LogDirsCommand {
-
-    def main(args: Array[String]): Unit = {
-        describe(args, System.out)
-    }
-
-    def describe(args: Array[String], out: PrintStream): Unit = {
-        val opts = new LogDirsCommandOptions(args)
-        val adminClient = createAdminClient(opts)
-        try {
-            val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
-            val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
-            val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-                case Some(brokerListStr) =>
-                    val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
-                    (inputBrokers.intersect(clusterBrokers), inputBrokers.diff(clusterBrokers))
-                case None => (clusterBrokers, Set.empty)
-            }
-
-            if (nonExistingBrokers.nonEmpty) {
-                out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}." +
-                  s" Current existent brokers: ${clusterBrokers.mkString(",")}")
-            } else {
-                out.println("Querying brokers for log directories information")
-                val describeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)
-                val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
-
-                out.println(s"Received log directory information from brokers ${existingBrokers.mkString(",")}")
-                out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
-            }
-        } finally {
-            adminClient.close()
-        }
-    }
-
-    private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirDescription]], topicSet: Set[String]): String = {
-        Json.encodeAsString(Map(
-            "version" -> 1,
-            "brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) =>
-                Map(
-                    "broker" -> broker,
-                    "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
-                        Map(
-                            "logDir" -> logDir,
-                            "error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull,
-                            "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) =>
-                                topicSet.isEmpty || topicSet.contains(topicPartition.topic)
-                            }.map { case (topicPartition, replicaInfo) =>
-                                Map(
-                                    "partition" -> topicPartition.toString,
-                                    "size" -> replicaInfo.size,
-                                    "offsetLag" -> replicaInfo.offsetLag,
-                                    "isFuture" -> replicaInfo.isFuture
-                                ).asJava
-                            }.asJava
-                        ).asJava
-                    }.asJava
-                ).asJava
-            }.asJava
-        ).asJava)
-    }
-
-    private def createAdminClient(opts: LogDirsCommandOptions): Admin = {
-        val props = if (opts.options.has(opts.commandConfigOpt))
-            Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
-        else
-            new Properties()
-        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
-        props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
-        Admin.create(props)
-    }
-
-    class LogDirsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args){
-        val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
-          .withRequiredArg
-          .describedAs("The server(s) to use for bootstrapping")
-          .ofType(classOf[String])
-        val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
-          .withRequiredArg
-          .describedAs("Admin client property file")
-          .ofType(classOf[String])
-        val describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.")
-        val topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " +
-          "All topics will be queried if no topic list is specified")
-          .withRequiredArg
-          .describedAs("Topic list")
-          .defaultsTo("")
-          .ofType(classOf[String])
-        val brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " +
-          "All brokers in the cluster will be queried if no broker list is specified")
-          .withRequiredArg
-          .describedAs("Broker list")
-          .ofType(classOf[String])
-
-        options = parser.parse(args : _*)
-
-        CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.")
-
-        CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt)
-    }
-}
diff --git a/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
deleted file mode 100644
index 03e9c1785ac..00000000000
--- a/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import java.io.{ByteArrayOutputStream, PrintStream}
-import java.nio.charset.StandardCharsets
-
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
-
-import scala.collection.Seq
-
-class LogDirsCommandTest extends KafkaServerTestHarness {
-
-  def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(1, zkConnect)
-      .map(KafkaConfig.fromProps)
-  }
-
-  @Test
-  def checkLogDirsCommandOutput(): Unit = {
-    val byteArrayOutputStream = new ByteArrayOutputStream
-    val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
-    //input exist brokerList
-    LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0", "--describe"), printStream)
-    val existingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
-    val existingBrokersLineIter = existingBrokersContent.split("\n").iterator
-
-    assertTrue(existingBrokersLineIter.hasNext)
-    assertTrue(existingBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
-
-    //input nonexistent brokerList
-    byteArrayOutputStream.reset()
-    LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,1,2", "--describe"), printStream)
-    val nonExistingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
-    val nonExistingBrokersLineIter = nonExistingBrokersContent.split("\n").iterator
-
-    assertTrue(nonExistingBrokersLineIter.hasNext)
-    assertTrue(nonExistingBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0"))
-
-    //input duplicate ids
-    byteArrayOutputStream.reset()
-    LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,0,1,2,2", "--describe"), printStream)
-    val duplicateBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
-    val duplicateBrokersLineIter = duplicateBrokersContent.split("\n").iterator
-
-    assertTrue(duplicateBrokersLineIter.hasNext)
-    assertTrue(duplicateBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0"))
-
-    //use all brokerList for current cluster
-    byteArrayOutputStream.reset()
-    LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--describe"), printStream)
-    val allBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
-    val allBrokersLineIter = allBrokersContent.split("\n").iterator
-
-    assertTrue(allBrokersLineIter.hasNext)
-    assertTrue(allBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
-  }
-}
diff --git a/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java b/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java
new file mode 100644
index 00000000000..4e6f0d3397a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    private static void execute(String... args) throws Exception {
+        LogDirsCommandOptions options = new LogDirsCommandOptions(args);
+        try (Admin adminClient = createAdminClient(options)) {
+            execute(options, adminClient);
+        }
+    }
+
+    static void execute(LogDirsCommandOptions options, Admin adminClient) throws Exception {
+        Set<String> topics = options.topics();
+        Set<Integer> clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
+        Set<Integer> inputBrokers = options.brokers();
+        Set<Integer> existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers) : new HashSet<>(inputBrokers);
+        existingBrokers.retainAll(clusterBrokers);
+        Set<Integer> nonExistingBrokers = new HashSet<>(inputBrokers);
+        nonExistingBrokers.removeAll(clusterBrokers);
+
+        if (!nonExistingBrokers.isEmpty()) {
+            throw new TerseException(
+                    String.format(
+                            "ERROR: The given brokers do not exist from --broker-list: %s. Current existent brokers: %s",
+                            commaDelimitedStringFromIntegerSet(nonExistingBrokers),
+                            commaDelimitedStringFromIntegerSet(clusterBrokers)));
+        } else {
+            System.out.println("Querying brokers for log directories information");
+            DescribeLogDirsResult describeLogDirsResult = adminClient.describeLogDirs(existingBrokers);
+            Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker = describeLogDirsResult.allDescriptions().get();
+
+            System.out.printf(
+                    "Received log directory information from brokers %s%n",
+                    commaDelimitedStringFromIntegerSet(existingBrokers));
+            System.out.println(formatAsJson(logDirInfosByBroker, topics));
+        }
+    }
+
+    private static String commaDelimitedStringFromIntegerSet(Set<Integer> set) {
+        return set.stream().map(String::valueOf).collect(Collectors.joining(","));
+    }
+
+    private static List<Map<String, Object>> fromReplicasInfoToPrintableRepresentation(Map<TopicPartition, ReplicaInfo> replicasInfo) {
+        return replicasInfo.entrySet().stream().map(entry -> {
+            TopicPartition topicPartition = entry.getKey();
+            return new HashMap<String, Object>() {{
+                    put("partition", topicPartition.toString());
+                    put("size", entry.getValue().size());
+                    put("offsetLag", entry.getValue().offsetLag());
+                    put("isFuture", entry.getValue().isFuture());
+                }};
+        }).collect(Collectors.toList());
+    }
+
+    private static List<Map<String, Object>> fromLogDirInfosToPrintableRepresentation(Map<String, LogDirDescription> logDirInfos, Set<String> topicSet) {
+        return logDirInfos.entrySet().stream().map(entry -> {
+            String logDir = entry.getKey();
+            return new HashMap<String, Object>() {{
+                    put("logDir", logDir);
+                    put("error", entry.getValue().error() != null ? entry.getValue().error().getClass().getName() : null);
+                    put("partitions", fromReplicasInfoToPrintableRepresentation(
+                            entry.getValue().replicaInfos().entrySet().stream().filter(entry -> {
+                                TopicPartition topicPartition = entry.getKey();
+                                return topicSet.isEmpty() || topicSet.contains(topicPartition.topic());
+                            }).collect(Collectors.toMap(Entry::getKey, Entry::getValue))
+                    ));
+                }};
+        }).collect(Collectors.toList());
+    }
+
+    private static String formatAsJson(Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker, Set<String> topicSet) throws JsonProcessingException {
+        return new ObjectMapper().writeValueAsString(new HashMap<String, Object>() {{
+                put("version", 1);
+                put("brokers", logDirInfosByBroker.entrySet().stream().map(entry -> {
+                    int broker = entry.getKey();
+                    Map<String, LogDirDescription> logDirInfos = entry.getValue();
+                    return new HashMap<String, Object>() {{
+                            put("broker", broker);
+                            put("logDirs", fromLogDirInfosToPrintableRepresentation(logDirInfos, topicSet));
+                        }};
+                }).collect(Collectors.toList()));
+            }});
+    }
+
+    private static Admin createAdminClient(LogDirsCommandOptions options) throws IOException {
+        Properties props = new Properties();
+        if (options.hasCommandConfig()) {
+            props.putAll(Utils.loadProps(options.commandConfig()));
+        }
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, options.bootstrapServers());
+        props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool");
+        return Admin.create(props);
+    }
+
+    // Visible for testing
+    static class LogDirsCommandOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpecBuilder describeOpt;
+        private final OptionSpec<String> topicListOpt;
+        private final OptionSpec<String> brokerListOpt;
+
+        public LogDirsCommandOptions(String... args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
+                    .withRequiredArg()
+                    .describedAs("The server(s) to use for bootstrapping")
+                    .ofType(String.class);
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("Admin client property file")
+                    .ofType(String.class);
+            describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.");
+            topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " +
+                            "All topics will be queried if no topic list is specified")
+                    .withRequiredArg()
+                    .describedAs("Topic list")
+                    .defaultsTo("")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " +
+                            "All brokers in the cluster will be queried if no broker list is specified")
+                    .withRequiredArg()
+                    .describedAs("Broker list")
+                    .ofType(String.class)
+                    .defaultsTo("");
+
+            options = parser.parse(args);
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.");
+
+            CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt);
+        }
+
+        private Stream<String> splitAtCommasAndFilterOutEmpty(OptionSpec<String> option) {
+            return Arrays.stream(options.valueOf(option).split(",")).filter(x -> !x.isEmpty());
+        }
+
+        private String bootstrapServers() {
+            return options.valueOf(bootstrapServerOpt);
+        }
+
+        private boolean hasCommandConfig() {
+            return options.has(commandConfigOpt);
+        }
+
+        private String commandConfig() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        private Set<String> topics() {
+            return splitAtCommasAndFilterOutEmpty(topicListOpt).collect(Collectors.toSet());
+        }
+
+        private Set<Integer> brokers() {
+            return splitAtCommasAndFilterOutEmpty(brokerListOpt).map(Integer::valueOf).collect(Collectors.toSet());
+        }
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
new file mode 100644
index 00000000000..4dfd36896c6
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+            List<Object> brokersInformation = (List<Object>) information.get("brokers");
+            Integer brokerId = (Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker");
+            assertEquals(1, brokersInformation.size());
+            assertEquals(1, brokerId);
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldQueryAllBrokersIfNonSpecified() throws JsonProcessingException {
+        Node brokerOne = new Node(1, "hostname", 9092);
+        Node brokerTwo = new Node(2, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerTwo, brokerOne), brokerOne)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+            List<Object> brokersInformation = (List<Object>) information.get("brokers");
+            Set<Integer> brokerIds = new HashSet<Integer>() {{
+                    add((Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker"));
+                    add((Integer) ((HashMap<String, Object>) brokersInformation.get(1)).get("broker"));
+                }};
+            assertEquals(2, brokersInformation.size());
+            assertEquals(new HashSet<>(Arrays.asList(2, 1)), brokerIds);
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldQuerySpecifiedBroker() throws JsonProcessingException {
+        Node brokerOne = new Node(1, "hostname", 9092);
+        Node brokerTwo = new Node(2, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerOne, brokerTwo), brokerOne)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+            List<Object> brokersInformation = (List<Object>) information.get("brokers");
+            Integer brokerId = (Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker");
+            assertEquals(1, brokersInformation.size());
+            assertEquals(1, brokerId);
+        }
+    }
+
+    private LogDirsCommand.LogDirsCommandOptions fromArgsToOptions(String... args) {
+        return new LogDirsCommand.LogDirsCommandOptions(args);
+    }
+
+    private String execute(LogDirsCommand.LogDirsCommandOptions options, Admin adminClient) {
+        Runnable runnable = () -> {
+            try {
+                LogDirsCommand.execute(options, adminClient);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+        return ToolsTestUtils.captureStandardOut(runnable);
+    }
+}