You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/16 13:16:56 UTC

[GitHub] [kafka] clolov opened a new pull request, #13122: KAFKA-14594: Move LogDirsCommand to tools module

clolov opened a new pull request, #13122:
URL: https://github.com/apache/kafka/pull/13122

   Questions for reviewers:
   * Ideally, I would like to move the test using `KafkaServerTestHarness`, but I do not know whether we want to severe all links to Scala by moving the tools?
   * Is there a plan to provide an implementation for `MockAdminClient::describeLogDirs` or should we be extending the class to provide implementation on test files?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072271288


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   Sorry, then may I get a bit of clarification on how do you envision this to work? Do you envision that a Java compatible copy of `CommandDefaultOptions` is written? I am asking this because I was battling with moving ConsoleConsumer and there I ran into the following problem:
   ```
   abstract class CommandDefaultOptions(val args: Array[String], allowCommandOptionAbbreviation: Boolean = false) {
     val parser = new OptionParser(allowCommandOptionAbbreviation)
   ...
   ```
   ```
   private static class ConsumerConfig extends CommandDefaultOptions {
           ConsumerConfig(String... args) {
               super(args, false)
               
               ... = this.parser(); <--- Cannot access joptsimple.OptionParser
   ```
   Is it in general that we are only trying to move the commands from Scala to Java or do we want a complete break from Scala classes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1098445584


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            LogDirsCommandOptions logDirsCommandOptions = new LogDirsCommandOptions(args);
+            try (Admin adminClient = createAdminClient(logDirsCommandOptions)) {
+                execute(logDirsCommandOptions, adminClient);
+            }
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(LogDirsCommandOptions logDirsCommandOptions, Admin adminClient) throws TerseException, JsonProcessingException, ExecutionException, InterruptedException {

Review Comment:
   I would suggest to simply declare Exception, rather than enumeration every single exception.



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            LogDirsCommandOptions logDirsCommandOptions = new LogDirsCommandOptions(args);
+            try (Admin adminClient = createAdminClient(logDirsCommandOptions)) {
+                execute(logDirsCommandOptions, adminClient);
+            }
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(LogDirsCommandOptions logDirsCommandOptions, Admin adminClient) throws TerseException, JsonProcessingException, ExecutionException, InterruptedException {
+        Set<String> topicList = Arrays.stream(logDirsCommandOptions.options.valueOf(logDirsCommandOptions.topicListOpt).split(",")).filter(x -> !x.isEmpty()).collect(Collectors.toSet());
+        Set<Integer> clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
+        Optional<String> brokers = Optional.ofNullable(logDirsCommandOptions.options.valueOf(logDirsCommandOptions.brokerListOpt));
+        Set<Integer> inputBrokers = Arrays.stream(brokers.orElse("").split(",")).filter(x -> !x.isEmpty()).map(Integer::valueOf).collect(Collectors.toSet());
+        Set<Integer> existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers): new HashSet<>(inputBrokers);
+        existingBrokers.retainAll(clusterBrokers);
+        Set<Integer> nonExistingBrokers = new HashSet<>(inputBrokers);
+        nonExistingBrokers.removeAll(clusterBrokers);
+
+        if (!nonExistingBrokers.isEmpty()) {
+            throw new TerseException(
+                    String.format(
+                            "ERROR: The given brokers do not exist from --broker-list: %s. Current existent brokers: %s\n",
+                            nonExistingBrokers.stream().map(String::valueOf).collect(Collectors.joining(",")),

Review Comment:
   The code `.stream().map(String::valueOf).collect` is repeated various times. Should we create a small utility method to improve readability?



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            LogDirsCommandOptions logDirsCommandOptions = new LogDirsCommandOptions(args);

Review Comment:
   Can we move this logic inside the execute command as with the original code and other tools? This would help if we decide to create a command superclass.



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            LogDirsCommandOptions logDirsCommandOptions = new LogDirsCommandOptions(args);
+            try (Admin adminClient = createAdminClient(logDirsCommandOptions)) {
+                execute(logDirsCommandOptions, adminClient);
+            }
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(LogDirsCommandOptions logDirsCommandOptions, Admin adminClient) throws TerseException, JsonProcessingException, ExecutionException, InterruptedException {
+        Set<String> topicList = Arrays.stream(logDirsCommandOptions.options.valueOf(logDirsCommandOptions.topicListOpt).split(",")).filter(x -> !x.isEmpty()).collect(Collectors.toSet());

Review Comment:
   In order to improve readability and maintenance, I would suggest to wrap every input/option parsing and transformation logic in a `LogDirsCommandOptions` method. For example, this line would be:
   
   ```sh
   Set<String> topicSet = logDirsCommandOptions.topicSet();
   ```



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            LogDirsCommandOptions logDirsCommandOptions = new LogDirsCommandOptions(args);
+            try (Admin adminClient = createAdminClient(logDirsCommandOptions)) {
+                execute(logDirsCommandOptions, adminClient);
+            }
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(LogDirsCommandOptions logDirsCommandOptions, Admin adminClient) throws TerseException, JsonProcessingException, ExecutionException, InterruptedException {
+        Set<String> topicList = Arrays.stream(logDirsCommandOptions.options.valueOf(logDirsCommandOptions.topicListOpt).split(",")).filter(x -> !x.isEmpty()).collect(Collectors.toSet());
+        Set<Integer> clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
+        Optional<String> brokers = Optional.ofNullable(logDirsCommandOptions.options.valueOf(logDirsCommandOptions.brokerListOpt));
+        Set<Integer> inputBrokers = Arrays.stream(brokers.orElse("").split(",")).filter(x -> !x.isEmpty()).map(Integer::valueOf).collect(Collectors.toSet());
+        Set<Integer> existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers): new HashSet<>(inputBrokers);
+        existingBrokers.retainAll(clusterBrokers);
+        Set<Integer> nonExistingBrokers = new HashSet<>(inputBrokers);
+        nonExistingBrokers.removeAll(clusterBrokers);
+
+        if (!nonExistingBrokers.isEmpty()) {
+            throw new TerseException(
+                    String.format(
+                            "ERROR: The given brokers do not exist from --broker-list: %s. Current existent brokers: %s\n",
+                            nonExistingBrokers.stream().map(String::valueOf).collect(Collectors.joining(",")),
+                            clusterBrokers.stream().map(String::valueOf).collect(Collectors.joining(","))));
+        } else {
+            System.out.println("Querying brokers for log directories information");
+            DescribeLogDirsResult describeLogDirsResult = adminClient.describeLogDirs(existingBrokers);
+            Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker = describeLogDirsResult.allDescriptions().get();
+
+            System.out.printf(
+                    "Received log directory information from brokers %s\n",
+                    existingBrokers.stream().map(String::valueOf).collect(Collectors.joining(",")));
+            System.out.println(formatAsJson(logDirInfosByBroker, topicList));
+        }
+    }
+
+    private static List<Map<String, Object>> fromReplicasInfoToPrintableRepresentation(Map<TopicPartition, ReplicaInfo> replicasInfo) {
+        return replicasInfo.entrySet().stream().map(entry -> {
+            TopicPartition topicPartition = entry.getKey();
+            ReplicaInfo replicaInfo = entry.getValue();
+            return new HashMap<String, Object>() {{
+                put("partition", topicPartition.toString());
+                put("size", replicaInfo.size());
+                put("offsetLag", replicaInfo.offsetLag());
+                put("isFuture", replicaInfo.isFuture());
+            }};
+        }).collect(Collectors.toList());
+    }
+
+    private static List<Map<String, Object>> fromLogDirInfosToPrintableRepresentation(Map<String, LogDirDescription> logDirInfos, Set<String> topicSet) {
+        return logDirInfos.entrySet().stream().map(entry -> {
+            String logDir = entry.getKey();
+            LogDirDescription logDirInfo = entry.getValue();
+            return new HashMap<String, Object>() {{
+                put("logDir", logDir);
+                put("error", Optional.ofNullable(logDirInfo.error()).map(ex -> ex.getClass().getName()));
+                put("partitions", fromReplicasInfoToPrintableRepresentation(
+                        logDirInfo.replicaInfos().entrySet().stream().filter(entry -> {
+                            TopicPartition topicPartition = entry.getKey();
+                            return topicSet.isEmpty() || topicSet.contains(topicPartition.topic());
+                        }).collect(Collectors.toMap(Entry::getKey, Entry::getValue))
+                ));
+            }};
+        }).collect(Collectors.toList());
+    }
+
+    private static String formatAsJson(Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker, Set<String> topicSet) throws JsonProcessingException {
+        return new ObjectMapper().writeValueAsString(new HashMap<String, Object>() {{
+            put("version", 1);
+            put("brokers", logDirInfosByBroker.entrySet().stream().map(entry -> {
+                int broker = entry.getKey();
+                Map<String, LogDirDescription> logDirInfos = entry.getValue();
+                return new HashMap<String, Object>() {{
+                    put("broker", broker);
+                    put("logDirs", fromLogDirInfosToPrintableRepresentation(logDirInfos, topicSet));
+                }};
+            }).collect(Collectors.toList()));
+        }});
+    }
+
+    private static Admin createAdminClient(LogDirsCommandOptions logDirsCommandOptions) throws IOException {
+        Properties props = new Properties();
+        if (logDirsCommandOptions.options.has(logDirsCommandOptions.commandConfigOpt)) {
+            Utils.loadProps(logDirsCommandOptions.options.valueOf(logDirsCommandOptions.commandConfigOpt));
+        }
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, logDirsCommandOptions.options.valueOf(logDirsCommandOptions.bootstrapServerOpt));
+        props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool");
+        return Admin.create(props);
+    }
+
+    // Visible for testing
+    static class LogDirsCommandOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpecBuilder describeOpt;
+        private final OptionSpec<String> topicListOpt;
+        private final OptionSpec<String> brokerListOpt;
+
+        public LogDirsCommandOptions(String... args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
+                    .withRequiredArg()
+                    .describedAs("The server(s) to use for bootstrapping")
+                    .ofType(String.class);
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("Admin client property file");

Review Comment:
   Missing ofType.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1071392892


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   I think that we should keep using `joptsimple` with the `CommandDefaultOptions` abstract class, rather than migrating all commands to `argparse4j`. The vast majority of commands do not use `argparse4j`.
   
   Then, the `joptsimple` (current) no-args output is very different:
   
   ```sh
   This tool helps to query log directory usage on the specified brokers.
   Option                                  Description                           
   ------                                  -----------                           
   --bootstrap-server <String: The server  REQUIRED: the server(s) to use for    
     (s) to use for bootstrapping>           bootstrapping                       
   --broker-list <String: Broker list>     The list of brokers to be queried in  
                                             the form "0,1,2". All brokers in the
                                             cluster will be queried if no broker
                                             list is specified                   
   --command-config <String: Admin client  Property file containing configs to be
     property file>                          passed to Admin Client.             
   --describe                              Describe the specified log directories
                                             on the specified brokers.           
   --help                                  Print usage information.              
   --topic-list <String: Topic list>       The list of topics to be queried in   
                                             the form "topic1,topic2,topic3". All
                                             topics will be queried if no topic  
                                             list is specified (default: )       
   --version                               Display Kafka version. 
   ```
   
   This is the new no-args output with `argparse4j`:
   
   ```sh
   usage: kafka-log-dirs [-h] --bootstrap-server BOOTSTRAP-SERVER [--command-config COMMAND-CONFIG] [--topic-list TOPIC-LIST]
                         [--broker-list BROKER-LIST]
   kafka-log-dirs: error: argument --bootstrap-server is required
   ```
   
   @ijuma @mimaison WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1073900755


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   I completed the migration of these shared classes, but I still need to do some testing before opening the PR (probably tomorrow).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1097662809


##########
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##########
@@ -870,7 +870,37 @@ synchronized public AlterReplicaLogDirsResult alterReplicaLogDirs(
     @Override
     synchronized public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers,

Review Comment:
   This implementation is provided because I wanted to use MockAdminClient rather than use Mockito and mock the Admin interface. I am open to suggestions for improving the current logic.



##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void checkLogDirsCommandOutput() throws UnsupportedEncodingException, TerseException, ExecutionException, JsonProcessingException, InterruptedException {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        PrintStream printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name());
+
+        PrintStream originalStandardOut = System.out;
+        System.setOut(printStream);
+
+        Node broker = new Node(1, "hostname", 9092);
+
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {

Review Comment:
   I opted to move the unit test as is, rather than improve it. If there is a preference to improve on it, I would either use a Mockito to mock the Admin interface for describeLogDirs or I would contribute a more accurate implementation to the MockAdminClient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1111724503


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    private static void execute(String... args) throws Exception {
+        LogDirsCommandOptions options = new LogDirsCommandOptions(args);
+        try (Admin adminClient = createAdminClient(options)) {
+            execute(options, adminClient);
+        }
+    }
+
+    static void execute(LogDirsCommandOptions options, Admin adminClient) throws Exception {
+        Set<String> topics = options.topics();
+        Set<Integer> clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
+        Set<Integer> inputBrokers = options.brokers();
+        Set<Integer> existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers) : new HashSet<>(inputBrokers);
+        existingBrokers.retainAll(clusterBrokers);
+        Set<Integer> nonExistingBrokers = new HashSet<>(inputBrokers);
+        nonExistingBrokers.removeAll(clusterBrokers);
+
+        if (!nonExistingBrokers.isEmpty()) {
+            throw new TerseException(
+                    String.format(

Review Comment:
   Nit: we don't need the newline here since we print the exception message using `println()` in `mainNoExit()`



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    private static void execute(String... args) throws Exception {
+        LogDirsCommandOptions options = new LogDirsCommandOptions(args);
+        try (Admin adminClient = createAdminClient(options)) {
+            execute(options, adminClient);
+        }
+    }
+
+    static void execute(LogDirsCommandOptions options, Admin adminClient) throws Exception {
+        Set<String> topics = options.topics();
+        Set<Integer> clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
+        Set<Integer> inputBrokers = options.brokers();
+        Set<Integer> existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers) : new HashSet<>(inputBrokers);
+        existingBrokers.retainAll(clusterBrokers);
+        Set<Integer> nonExistingBrokers = new HashSet<>(inputBrokers);
+        nonExistingBrokers.removeAll(clusterBrokers);
+
+        if (!nonExistingBrokers.isEmpty()) {
+            throw new TerseException(
+                    String.format(
+                            "ERROR: The given brokers do not exist from --broker-list: %s. Current existent brokers: %s%n",
+                            commaDelimitedStringFromIntegerSet(nonExistingBrokers),
+                            commaDelimitedStringFromIntegerSet(clusterBrokers)));
+        } else {
+            System.out.println("Querying brokers for log directories information");
+            DescribeLogDirsResult describeLogDirsResult = adminClient.describeLogDirs(existingBrokers);
+            Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker = describeLogDirsResult.allDescriptions().get();
+
+            System.out.printf(
+                    "Received log directory information from brokers %s%n",
+                    commaDelimitedStringFromIntegerSet(existingBrokers));
+            System.out.println(formatAsJson(logDirInfosByBroker, topics));
+        }
+    }
+
+    private static String commaDelimitedStringFromIntegerSet(Set<Integer> set) {
+        return set.stream().map(String::valueOf).collect(Collectors.joining(","));
+    }
+
+    private static List<Map<String, Object>> fromReplicasInfoToPrintableRepresentation(Map<TopicPartition, ReplicaInfo> replicasInfo) {
+        return replicasInfo.entrySet().stream().map(entry -> {
+            TopicPartition topicPartition = entry.getKey();
+            return new HashMap<String, Object>() {{
+                    put("partition", topicPartition.toString());
+                    put("size", entry.getValue().size());
+                    put("offsetLag", entry.getValue().offsetLag());
+                    put("isFuture", entry.getValue().isFuture());
+                }};
+        }).collect(Collectors.toList());
+    }
+
+    private static List<Map<String, Object>> fromLogDirInfosToPrintableRepresentation(Map<String, LogDirDescription> logDirInfos, Set<String> topicSet) {
+        return logDirInfos.entrySet().stream().map(entry -> {
+            String logDir = entry.getKey();
+            return new HashMap<String, Object>() {{
+                    put("logDir", logDir);
+                    put("error", entry.getValue().error() != null ? entry.getValue().error().getClass().getName() : null);
+                    put("partitions", fromReplicasInfoToPrintableRepresentation(
+                            entry.getValue().replicaInfos().entrySet().stream().filter(entry -> {
+                                TopicPartition topicPartition = entry.getKey();
+                                return topicSet.isEmpty() || topicSet.contains(topicPartition.topic());
+                            }).collect(Collectors.toMap(Entry::getKey, Entry::getValue))
+                    ));
+                }};
+        }).collect(Collectors.toList());
+    }
+
+    private static String formatAsJson(Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker, Set<String> topicSet) throws JsonProcessingException {
+        return new ObjectMapper().writeValueAsString(new HashMap<String, Object>() {{
+                put("version", 1);
+                put("brokers", logDirInfosByBroker.entrySet().stream().map(entry -> {
+                    int broker = entry.getKey();
+                    Map<String, LogDirDescription> logDirInfos = entry.getValue();
+                    return new HashMap<String, Object>() {{
+                            put("broker", broker);
+                            put("logDirs", fromLogDirInfosToPrintableRepresentation(logDirInfos, topicSet));
+                        }};
+                }).collect(Collectors.toList()));
+            }});
+    }
+
+    private static Admin createAdminClient(LogDirsCommandOptions options) throws IOException {
+        Properties props = new Properties();
+        if (options.hasCommandConfig()) {
+            Utils.loadProps(options.commandConfig());

Review Comment:
   This should be added to `props` otherwise it's effectively ignoring the configuration file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1179318267


##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldThrowWhenDuplicatedBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,0,1,2,2", "--describe"), adminClient));

Review Comment:
   This does not throw because duplicate broker ids have been provided but instead because broker 0 and 2 don't exist:
   ```
   ERROR: The given brokers do not exist from --broker-list: 0,2. Current existent brokers: 1
   ```



##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldThrowWhenDuplicatedBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,0,1,2,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldQueryAllBrokersIfNonSpecified() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertTrue(standardOutputLines.length != 0);
+            assertTrue(standardOutputLines[0].contains("Querying brokers for log directories information"));

Review Comment:
   This message is also printed if you specify a specific broker id. Actually the test pass if I had `"--broker-list", "1"` to the arguments



##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldThrowWhenDuplicatedBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,0,1,2,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldQueryAllBrokersIfNonSpecified() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertTrue(standardOutputLines.length != 0);
+            assertTrue(standardOutputLines[0].contains("Querying brokers for log directories information"));
+        }
+    }
+
+    @Test
+    public void shouldQuerySpecifiedBroker() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertTrue(standardOutputLines.length != 0);
+            assertTrue(standardOutputLines[0].contains("Querying brokers for log directories information"));

Review Comment:
   This test has the same assertions than the test above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
mimaison commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072229304


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   We should not change the argument parsing library in this PR. This is likely to slightly change the usage and could potentially break existing commands/scripts used by users. Yes ideally we should use a single library but we can do that after (and it's likely to require a KIP).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1506693957

   Hey @fvaleri and @mimaison, I believe I addressed Michael's comments and I rebased on trunk. Do let me know if there is something else I should do 😊 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1505038104

   @clolov can you address the comments from Mickael so that we can have a chance to merge it in time for 3.5.0? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1533310873

   Thanks @clolov for the update. There's a few checkstyle failures in LogDirsCommandTest.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1182519045


##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldThrowWhenDuplicatedBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,0,1,2,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldQueryAllBrokersIfNonSpecified() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertTrue(standardOutputLines.length != 0);
+            assertTrue(standardOutputLines[0].contains("Querying brokers for log directories information"));

Review Comment:
   Yup, updated in the next commit.



##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldThrowWhenDuplicatedBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,0,1,2,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldQueryAllBrokersIfNonSpecified() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertTrue(standardOutputLines.length != 0);
+            assertTrue(standardOutputLines[0].contains("Querying brokers for log directories information"));
+        }
+    }
+
+    @Test
+    public void shouldQuerySpecifiedBroker() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertTrue(standardOutputLines.length != 0);
+            assertTrue(standardOutputLines[0].contains("Querying brokers for log directories information"));

Review Comment:
   Yup, updated in the next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072271288


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   Sorry, then may I get a bit of clarification on how do you envision this to work? Do you envision that a Java compatible copy of `CommandDefaultOptions` is written? I am asking this because I was battling with moving ConsoleConsumer and there I ran into the following problem:
   ```
   abstract class CommandDefaultOptions(val args: Array[String], allowCommandOptionAbbreviation: Boolean = false) {
     val parser = new OptionParser(allowCommandOptionAbbreviation)
   ...
   ```
   ```
   private static class ConsumerConfig extends CommandDefaultOptions {
           ConsumerConfig(String... args) {
               super(args, false)
               
               ... = this.parser(); <--- Cannot access joptsimple.OptionParser
   ```
   Is it in general that we are only trying to move the commands from Scala to Java or do we want a complete break from Scala classes? If we are completely moving to Java does this mean that `CommandLineUtils` needs to be rewritten first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072022574


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {

Review Comment:
   Okay, I will do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1071392892


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   I think that we should keep using `joptsimple` with the `CommandDefaultOptions` abstract class, rather than migrating all commands to `argparse4j`. The vast majority of commands do not use `argparse4j`.
   
   Then, the `joptsimple` (current) no-args output is very different:
   
   ```sh
   This tool helps to query log directory usage on the specified brokers.
   Option                                  Description                           
   ------                                  -----------                           
   --bootstrap-server <String: The server  REQUIRED: the server(s) to use for    
     (s) to use for bootstrapping>           bootstrapping                       
   --broker-list <String: Broker list>     The list of brokers to be queried in  
                                             the form "0,1,2". All brokers in the
                                             cluster will be queried if no broker
                                             list is specified                   
   --command-config <String: Admin client  Property file containing configs to be
     property file>                          passed to Admin Client.             
   --describe                              Describe the specified log directories
                                             on the specified brokers.           
   --help                                  Print usage information.              
   --topic-list <String: Topic list>       The list of topics to be queried in   
                                             the form "topic1,topic2,topic3". All
                                             topics will be queried if no topic  
                                             list is specified (default: )       
   --version                               Display Kafka version. 
   ```
   
   New output:
   
   ```sh
   usage: kafka-log-dirs [-h] --bootstrap-server BOOTSTRAP-SERVER [--command-config COMMAND-CONFIG] [--topic-list TOPIC-LIST]
                         [--broker-list BROKER-LIST]
   kafka-log-dirs: error: argument --bootstrap-server is required
   ```
   
   @ijuma @mimaison WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1071371063


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+

Review Comment:
   Missing comment.



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   I think that we should keep using `joptsimple` with the `CommandDefaultOptions` abstract class, rather than migrating all commands to `argparse4j`. The vast majority of commands do not use `argparse4j`.
   
   Then, the no-args output is very different:
   
   ```sh
   This tool helps to query log directory usage on the specified brokers.
   Option                                  Description                           
   ------                                  -----------                           
   --bootstrap-server <String: The server  REQUIRED: the server(s) to use for    
     (s) to use for bootstrapping>           bootstrapping                       
   --broker-list <String: Broker list>     The list of brokers to be queried in  
                                             the form "0,1,2". All brokers in the
                                             cluster will be queried if no broker
                                             list is specified                   
   --command-config <String: Admin client  Property file containing configs to be
     property file>                          passed to Admin Client.             
   --describe                              Describe the specified log directories
                                             on the specified brokers.           
   --help                                  Print usage information.              
   --topic-list <String: Topic list>       The list of topics to be queried in   
                                             the form "topic1,topic2,topic3". All
                                             topics will be queried if no topic  
                                             list is specified (default: )       
   --version                               Display Kafka version. 
   ```
   
   New output:
   
   ```sh
   usage: kafka-log-dirs [-h] --bootstrap-server BOOTSTRAP-SERVER [--command-config COMMAND-CONFIG] [--topic-list TOPIC-LIST]
                         [--broker-list BROKER-LIST]
   kafka-log-dirs: error: argument --bootstrap-server is required
   ```
   
   @ijuma @mimaison WDYT?



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {

Review Comment:
   Can you just declare Exception to be compatible with MetadataQuorumCommand?



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {

Review Comment:
   Can you also add TerseException catch as shown in MetadataQuorumCommand. At some point we will probably create a Command interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1098445584


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LogDirsCommand {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            LogDirsCommandOptions logDirsCommandOptions = new LogDirsCommandOptions(args);
+            try (Admin adminClient = createAdminClient(logDirsCommandOptions)) {
+                execute(logDirsCommandOptions, adminClient);
+            }
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(LogDirsCommandOptions logDirsCommandOptions, Admin adminClient) throws TerseException, JsonProcessingException, ExecutionException, InterruptedException {

Review Comment:
   I would suggest to simply declare Exception, rather than listing every single exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1420901116

   Heya @fvaleri, I hope I have addressed all your comments.
   Unless I am wrong, the only difference between the two outputs is that in the old the error was
   ```
   "error": null
   ```
   while in the new the error is
   ```
   "error": {
     "empty": true,
     "present": false
   },
   ```
   I have fixed this in subsequent commits


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1384049900

   Tagging @mimaison, @fvaleri and @dengziming as reviewers of https://github.com/apache/kafka/pull/13080


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072326336


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   If it isn't too big of a problem may I have the 2 classes and see how far I can go with them at least in this pull request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1424403375

   @mimaison I think this is ready if you have time for a review :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1505525910

   Yes, sorry, this feel off my radar. You will have a next commit tomorrow morning!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072022248


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+

Review Comment:
   Yup, this is my bad, I will add it in subsequent commits.



##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072305087


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   > If we are completely moving to Java does this mean that CommandLineUtils needs to be rewritten first?
   
   Yes, I already created `CommandLineUtils` as part of the `DumpLogSegments` migration, which also includes `CommandDefaultOptions`, but I still have some work to do before opening the PR. Let me know if you want a separate PR with just these 3 shared classes.
   
   That said, I think we will need to have tools->core temporary dependency to avoid duplicating critical code like `GroupMetadataManager.formatRecordKeyAndValue(record)` and `TransactionLog.formatRecordKeyAndValue(record)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072305087


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   > If we are completely moving to Java does this mean that CommandLineUtils needs to be rewritten first?
   
   Yes, I already created `CommandLineUtils` as part of the `DumpLogSegments` migration, which also includes `CommandDefaultOptions`, but I still have some work to do before opening the PR. Let me know if you want a separate PR with just these 2 shared classes.
   
   That said, I think we will need to have tools->core temporary dependency to avoid duplicating critical code like `GroupMetadataManager.formatRecordKeyAndValue(record)` and `TransactionLog.formatRecordKeyAndValue(record)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1073197282


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   Yes, I'll open a dedicated PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1098517712


##########
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##########
@@ -870,7 +870,37 @@ synchronized public AlterReplicaLogDirsResult alterReplicaLogDirs(
     @Override
     synchronized public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers,

Review Comment:
   That's fine. I would suggest to look at JmxToolTest for a way to get out/err content and assert on it, which is what the original test does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13122:
URL: https://github.com/apache/kafka/pull/13122#issuecomment-1531443138

   Heya @mimaison, thank you for the review! I have updated the assertions to be more precise. I wanted to avoid creating a whole new POJO to turn the String into a navigable map, but if you find the usage of SuppressWarnings unsatisfactory I can put in the time. Please let me know what you think!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1183882730


##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            @SuppressWarnings("unchecked")
+            Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+            @SuppressWarnings("unchecked")
+            List<Object> brokerInformation = (List<Object>) information.get("brokers");
+            @SuppressWarnings("unchecked")
+            Integer brokerId = (Integer) ((HashMap<String, Object>) brokerInformation.get(0)).get("broker");
+            assertEquals(1, brokerInformation.size());
+            assertEquals(1, brokerId);
+        }
+    }
+
+    @Test
+    public void shouldQueryAllBrokersIfNonSpecified() throws JsonProcessingException {
+        Node brokerOne = new Node(1, "hostname", 9092);
+        Node brokerTwo = new Node(2, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerOne, brokerTwo), brokerOne)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            @SuppressWarnings("unchecked")
+            Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+            @SuppressWarnings("unchecked")
+            List<Object> brokerInformation = (List<Object>) information.get("brokers");
+            @SuppressWarnings("unchecked")
+            Integer brokerOneId = (Integer) ((HashMap<String, Object>) brokerInformation.get(0)).get("broker");
+            @SuppressWarnings("unchecked")
+            Integer brokerTwoId = (Integer) ((HashMap<String, Object>) brokerInformation.get(1)).get("broker");

Review Comment:
   You are absolutely correct, order is not guaranteed. In the newest commit I check that the elements are present in a set. Thank you for caching this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1183881591


##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            @SuppressWarnings("unchecked")

Review Comment:
   Okay, moved it to the top of the methods!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1182729223


##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            @SuppressWarnings("unchecked")

Review Comment:
   In this case I think it makes sense to put `SuppressWarnings` at the method level. Same below



##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            @SuppressWarnings("unchecked")
+            Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+            @SuppressWarnings("unchecked")
+            List<Object> brokerInformation = (List<Object>) information.get("brokers");
+            @SuppressWarnings("unchecked")
+            Integer brokerId = (Integer) ((HashMap<String, Object>) brokerInformation.get(0)).get("broker");
+            assertEquals(1, brokerInformation.size());
+            assertEquals(1, brokerId);
+        }
+    }
+
+    @Test
+    public void shouldQueryAllBrokersIfNonSpecified() throws JsonProcessingException {
+        Node brokerOne = new Node(1, "hostname", 9092);
+        Node brokerTwo = new Node(2, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerOne, brokerTwo), brokerOne)) {
+            String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
+            String[] standardOutputLines = standardOutput.split("\n");
+            assertEquals(3, standardOutputLines.length);
+            @SuppressWarnings("unchecked")
+            Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
+            @SuppressWarnings("unchecked")
+            List<Object> brokerInformation = (List<Object>) information.get("brokers");
+            @SuppressWarnings("unchecked")
+            Integer brokerOneId = (Integer) ((HashMap<String, Object>) brokerInformation.get(0)).get("broker");
+            @SuppressWarnings("unchecked")
+            Integer brokerTwoId = (Integer) ((HashMap<String, Object>) brokerInformation.get(1)).get("broker");

Review Comment:
   Is the order guaranteed here or could the 2nd entry in the list be brokerOneId?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison merged pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison merged PR #13122:
URL: https://github.com/apache/kafka/pull/13122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1182518605


##########
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogDirsCommandTest {
+
+    @Test
+    public void shouldThrowWhenQueryingNonExistentBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
+        }
+    }
+
+    @Test
+    public void shouldThrowWhenDuplicatedBrokers() {
+        Node broker = new Node(1, "hostname", 9092);
+        try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
+            assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,0,1,2,2", "--describe"), adminClient));

Review Comment:
   That's fair, looking over the old code it did not actually throw if it encountered duplicate brokers either. Since I use a set the duplicates just get swallowed up. I have updated the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072024066


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   To be honest, I used `argparse4j` because I looked at https://github.com/apache/kafka/pull/13080/files as a reference. I don't have a strong opinion either way, but the first output you presented seems nicer to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1073900755


##########
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String[] args) throws ExecutionException, InterruptedException, IOException {
+        ArgumentParser parser = ArgumentParsers

Review Comment:
   I completed the migration of the shared classes, but I still need to do some testing before opening the PR (probably tomorrow).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org