You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ruslankrivoshein (via GitHub)" <gi...@apache.org> on 2023/04/13 18:34:14 UTC

[GitHub] [kafka] ruslankrivoshein opened a new pull request, #13562: KAFKA-14581: Moving GetOffsetShell to tools

ruslankrivoshein opened a new pull request, #13562:
URL: https://github.com/apache/kafka/pull/13562

   WIP
   
   This PR is based on [another PR](https://github.com/apache/kafka/pull/13158). During development I just had copied that files, since there was no more remarks.
   Also I've stuck with one thing. GetOffsetShellTest extends KafkaServerTestHarness and KafkaServerTestHarness extends QuorumTestHarness, all of them are written in Scala, so I'd like to ask any suggestion about refactoring. A lot of test classes depends on KafkaServerTestHarness, so after GetOffsetShellTest that class also must be rewritten at the same time. Should I start rewriting it completely or I may write any stub for the first time?
   And I also have to ask about createTopicPartitionFilterWithPatternList and createTopicPartitionFilterWithTopicAndPartitionPattern methods. For sake of good tests transfer I've made that methods public. Is that good for such a little util command?
   
   @ijuma, @mimaison, I hope I tag you right. Could you take a look and clarify it for me?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1195250433


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    @BeforeEach
+    public void setUp() {
+    }
+
+    static class Row {
+        private String name;
+        private int partition;
+        private Long timestamp;
+
+        public Row(String name, int partition, Long timestamp) {
+            this.name = name;
+            this.partition = partition;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == this) return true;
+
+            if (!(o instanceof Row)) return false;
+
+            Row r = (Row) o;
+
+            return name.equals(r.name) && partition == r.partition && Objects.equals(timestamp, r.timestamp);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, partition, timestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testNoFilterOptions() {
+        try(Admin admin = Admin.create(cluster.config().adminClientProperties())) {
+            List<NewTopic> topics = new ArrayList<>();
+
+            IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short)1)));
+
+            admin.createTopics(topics);
+        }
+
+        Properties props = new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers"));
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+            IntStream.range(0, topicCount + 1)
+                     .forEach(i -> IntStream.range(0, i * i)
+                            .forEach(msgCount -> producer.send(
+                                    new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)))
+                     );
+        }

Review Comment:
   I decided to fit the tests for that type of test extensions, but now I struggle with environment preparation. I don't know how to get access to `cluser` in `@BeforeEach`. `@ClusterTest` annotation doesn't help, so I'd like to know, how can I create topics before tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1197583887


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    @BeforeEach
+    public void setUp() {

Review Comment:
   Yes, we can get cluster variable, but In this approach the `adminClientProperties` in `ClusterConfig` isn't initialized yet, as in `ApiVersionsRequestTest`. I can't figure out, where this gets initialized...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1307122178


##########
core/src/main/scala/kafka/tools/GetOffsetShell.scala:
##########


Review Comment:
   @dengziming are you good with this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1286370800


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,
+                    options.partitionsOpt()
+            );
+        }
+
+        Properties config;
+
+        if (options.hasCommandConfigOpt()) {
+            config = Utils.loadProps(options.commandConfigOpt());
+        } else {
+            config = new Properties();
+        }
+
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId);
+
+        try (Admin adminClient = Admin.create(config)) {
+            List<TopicPartition> partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics);
+
+            if (partitionInfos.isEmpty()) {
+                throw new TerseException("Could not match any topic-partitions with the specified filters");
+            }
+
+            Map<TopicPartition, OffsetSpec> timestampsToSearch = partitionInfos.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec));
+
+            ListOffsetsResult listOffsetsResult = adminClient.listOffsets(timestampsToSearch);
+
+            TreeMap<TopicPartition, Long> partitionOffsets = new TreeMap<>(Comparator.comparing(TopicPartition::toString));
+
+            for (TopicPartition partition : partitionInfos) {
+                ListOffsetsResultInfo partitionInfo;
+
+                try {
+                    partitionInfo = listOffsetsResult.partitionResult(partition).get();
+                } catch (KafkaException e) {
+                    System.err.println("Skip getting offsets for topic-partition " + partition.toString() + " due to error: " + e.getMessage());
+
+                    continue;
+                } catch (InterruptedException | ExecutionException ignored) {
+                    continue;
+                }
+
+                if (partitionInfo.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
+                    partitionOffsets.put(partition, partitionInfo.offset());
+                }
+            }
+
+            return partitionOffsets;
+        }
+    }
+
+    private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
+        switch (listOffsetsTimestamp) {
+            case "earliest":
+                return OffsetSpec.earliest();
+            case "latest":
+                return OffsetSpec.latest();
+            case "max-timestamp":
+                return OffsetSpec.maxTimestamp();
+            default:
+                long timestamp;
+
+                try {
+                    timestamp = Long.parseLong(listOffsetsTimestamp);
+                } catch (NumberFormatException e) {
+                    throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " +
+                            "Please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp");
+                }
+
+                if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+                    return OffsetSpec.earliest();
+                } else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
+                    return OffsetSpec.latest();
+                } else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+                    return OffsetSpec.maxTimestamp();
+                } else {
+                    return OffsetSpec.forTimestamp(timestamp);
+                }
+        }
+    }
+
+    /**
+     * Creates a topic-partition filter based on a list of patterns.
+     * Expected format:
+     * List: TopicPartitionPattern(, TopicPartitionPattern)*
+     * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern
+     * TopicPattern: REGEX
+     * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+     */
+    public TopicPartitionFilter createTopicPartitionFilterWithPatternList(String topicPartitions) {
+        List<String> ruleSpecs = Arrays.asList(topicPartitions.split(","));
+        List<TopicPartitionFilter> rules = ruleSpecs.stream().map(ruleSpec -> {
+            try {
+                return parseRuleSpec(ruleSpec);
+            } catch (TerseException e) {
+                throw new RuntimeException(e);
+            }
+        }).collect(Collectors.toList());
+
+        return new CompositeTopicPartitionFilter(rules);
+    }
+
+    /**
+     * Creates a topic-partition filter based on a topic pattern and a set of partition ids.
+     */
+    public TopicPartitionFilter createTopicPartitionFilterWithTopicAndPartitionPattern(String topicOpt, String partitionIds) throws TerseException {
+        return new TopicFilterAndPartitionFilter(
+                new IncludeList(topicOpt != null ? topicOpt : ".*"),
+                new PartitionsSetFilter(createPartitionSet(partitionIds))
+        );
+    }
+
+    private Set<Integer> createPartitionSet(String partitionsString) throws TerseException {
+        Set<Integer> partitions;
+
+        if (partitionsString == null || partitionsString.isEmpty()) {
+            partitions = Collections.emptySet();
+        } else {
+            try {
+                partitions = Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());
+            } catch (NumberFormatException e) {
+                throw new TerseException("--partitions expects a comma separated list of numeric " +
+                        "partition ids, but received: " + partitionsString);
+            }
+        }
+
+        return partitions;
+    }
+
+    /**
+     * Return the partition infos. Filter them with topicPartitionFilter.
+     */
+    private List<TopicPartition> listPartitionInfos(
+            Admin client,
+            TopicPartitionFilter topicPartitionFilter,
+            boolean excludeInternalTopics
+    ) throws ExecutionException, InterruptedException {
+        ListTopicsOptions listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics);
+        Set<String> topics = client.listTopics(listTopicsOptions).names().get();
+        Set<String> filteredTopics = topics.stream().filter(topicPartitionFilter::isTopicAllowed).collect(Collectors.toSet());
+
+        return client.describeTopics(filteredTopics).allTopicNames().get().entrySet().stream().flatMap(
+                topic -> topic.getValue().partitions().stream().map(
+                        tp -> new TopicPartition(topic.getKey(), tp.partition())
+                ).filter(topicPartitionFilter::isTopicPartitionAllowed)
+        ).collect(Collectors.toList());
+    }
+
+    private TopicPartitionFilter parseRuleSpec(String ruleSpec) throws TerseException, RuntimeException {
+        Matcher matcher = TOPIC_PARTITION_PATTERN.matcher(ruleSpec);
+
+        if (!matcher.matches())
+            throw new TerseException("Invalid rule specification: " + ruleSpec);

Review Comment:
   It was a recommendation of @fvaleri



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1285035259


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -99,4 +101,26 @@ public static void prettyPrintTable(
         printRow(columnLengths, headers, out);
         rows.forEach(row -> printRow(columnLengths, row, out));
     }
+
+    public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException {
+        if (hostPort == null || hostPort.isEmpty()) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1285005842


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -99,4 +101,26 @@ public static void prettyPrintTable(
         printRow(columnLengths, headers, out);
         rows.forEach(row -> printRow(columnLengths, row, out));
     }
+
+    public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException {
+        if (hostPort == null || hostPort.isEmpty()) {

Review Comment:
   Since Java 11 we can use `isBlank()` that also ignores white spaces. Unfortunately, we still need to provide compatibility with Java 8, where we can achieve the same by doing `hostPort.trim().isEmpty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14581: Moving GetOffsetShell to tools [kafka]

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1742817052

   @fvaleri I think it's time to take a note about this changes in [KAFKA-14705](https://issues.apache.org/jira/browse/KAFKA-14705), isn't it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1287886679


##########
core/src/main/scala/kafka/tools/GetOffsetShell.scala:
##########


Review Comment:
   We should still keep this file can forward all args to new GetOffsetShell, see `FeatureCommand.scala`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1710177527

   > update the description to list your changes
   
   @dengziming or @fvaleri, could you clarify it to me, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming merged pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming merged PR #13562:
URL: https://github.com/apache/kafka/pull/13562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1255510339


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        getOffsetShell.parseArgs(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private OptionSet options;
+    private OptionSpec<String> topicPartitionsOpt;
+    private OptionSpec<String> topicOpt;
+    private OptionSpec<String> partitionsOpt;
+    private OptionSpec<String> timeOpt;
+    private OptionSpec<String> commandConfigOpt;
+    private OptionSpec<String> effectiveBrokerListOpt;
+    private OptionSpecBuilder excludeInternalTopicsOpt;
+
+    public void parseArgs(final String[] args) {
+        final OptionParser parser = new OptionParser(false);
+
+        OptionSpec<String> brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        OptionSpec<String> bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .requiredUnless("broker-list")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + topicPartitionPattern + "'." +
+                        " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                        " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                .withRequiredArg()
+                .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                .ofType(String.class);
+        topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("partition ids")
+                .ofType(String.class);
+        timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                .withRequiredArg()
+                .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                .ofType(String.class)
+                .defaultsTo("latest");
+        commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+        if (args.length == 0) {
+            CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+        }
+
+        options = parser.parse(args);
+
+        if (options.has(bootstrapServerOpt)) {
+            effectiveBrokerListOpt = bootstrapServerOpt;
+        } else {
+            effectiveBrokerListOpt = brokerListOpt;
+        }
+
+        CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+        String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+        ToolsUtils.validatePortOrDie(parser, brokerList);

Review Comment:
   In order to be consistent with other tools, I would call it `validatePortOrExit`.



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        getOffsetShell.parseArgs(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private OptionSet options;
+    private OptionSpec<String> topicPartitionsOpt;
+    private OptionSpec<String> topicOpt;
+    private OptionSpec<String> partitionsOpt;
+    private OptionSpec<String> timeOpt;
+    private OptionSpec<String> commandConfigOpt;
+    private OptionSpec<String> effectiveBrokerListOpt;
+    private OptionSpecBuilder excludeInternalTopicsOpt;
+
+    public void parseArgs(final String[] args) {
+        final OptionParser parser = new OptionParser(false);
+
+        OptionSpec<String> brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        OptionSpec<String> bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .requiredUnless("broker-list")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + topicPartitionPattern + "'." +
+                        " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                        " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                .withRequiredArg()
+                .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                .ofType(String.class);
+        topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("partition ids")
+                .ofType(String.class);
+        timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                .withRequiredArg()
+                .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                .ofType(String.class)
+                .defaultsTo("latest");
+        commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+        if (args.length == 0) {
+            CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+        }
+
+        options = parser.parse(args);
+
+        if (options.has(bootstrapServerOpt)) {
+            effectiveBrokerListOpt = bootstrapServerOpt;
+        } else {
+            effectiveBrokerListOpt = brokerListOpt;
+        }
+
+        CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+        String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+        ToolsUtils.validatePortOrDie(parser, brokerList);

Review Comment:
   In order to be consistent with other tools and utility clasees, I would call it `validatePortOrExit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1271470073


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   So it also should be moved from kafka/tests/core to kafka/tests/tools?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1270873057


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   And what should it do when null?
   Could you also tell me more about improvement? I can't find FQCN in my implementation. And how it must be correcter?



##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   And what should it do when null?
   Could you also tell me more about improvement? I can't find FQCN in my implementation. And how it must be corrected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1233594609


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        getOffsetShell.parseArgs(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private OptionSet options;
+    private OptionSpec<String> topicPartitionsOpt;
+    private OptionSpec<String> topicOpt;
+    private OptionSpec<String> partitionsOpt;
+    private OptionSpec<String> timeOpt;
+    private OptionSpec<String> commandConfigOpt;
+    private OptionSpec<String> effectiveBrokerListOpt;
+    private OptionSpecBuilder excludeInternalTopicsOpt;
+
+    public void parseArgs(final String[] args) {
+        final OptionParser parser = new OptionParser(false);
+
+        OptionSpec<String> brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        OptionSpec<String> bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .requiredUnless("broker-list")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + topicPartitionPattern + "'." +
+                        " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                        " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                .withRequiredArg()
+                .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                .ofType(String.class);
+        topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("partition ids")
+                .ofType(String.class);
+        timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                .withRequiredArg()
+                .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                .ofType(String.class)
+                .defaultsTo("latest");
+        commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+        if (args.length == 0) {
+            CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+        }
+
+        options = parser.parse(args);
+
+        if (options.has(bootstrapServerOpt)) {
+            effectiveBrokerListOpt = bootstrapServerOpt;
+        } else {
+            effectiveBrokerListOpt = brokerListOpt;
+        }
+
+        CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+        String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+        ToolsUtils.validatePortOrDie(parser, brokerList);
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets() throws IOException, ExecutionException, InterruptedException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+        if (options.has(topicPartitionsOpt) && (options.has(topicOpt) || options.has(partitionsOpt))) {
+            throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.has(excludeInternalTopicsOpt);
+        OffsetSpec offsetSpec = parseOffsetSpec(options.valueOf(timeOpt));
+
+        TopicPartitionFilter topicPartitionFilter;

Review Comment:
   That class is in PR mentioned in description. I hope it will be merged once :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14581: Moving GetOffsetShell to tools [kafka]

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1742881176

   > @fvaleri I think it's time to take a note about this changes in [KAFKA-14705](https://issues.apache.org/jira/browse/KAFKA-14705), isn't it?
   
   This should fall under the phrase: "4. We should also get rid of many deprecated options across all tools, including not migrated tools."
   
   If you want to provide a list of all deprecated options across all tools, that would be great, and I will be happy to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1711300123

   @dengziming is there any template for this or an example?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1713063574

   Test failures are unrelated
   ```
   [Build / JDK 8 and Scala 2.12 / kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13562/23/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDescribeUnderMinIsrPartitionsMixed_String__quorum_kraft/)
   [Build / JDK 17 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13562/23/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_17_and_Scala_2_13___testRackAwareRangeAssignor__/)
   [Build / JDK 17 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13562/23/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_17_and_Scala_2_13___testRackAwareRangeAssignor___2/)
   [Build / JDK 17 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserFails(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13562/23/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_17_and_Scala_2_13___testDescribeTokenForOtherUserFails_String__quorum_kraft/)
   [Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13562/23/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13562/23/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testSyncTopicConfigs__/)
   [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13562/23/testReport/junit/org.apache.kafka.connect.integration/ConnectorRestartApiIntegrationTest/Build___JDK_11_and_Scala_2_13___testMultiWorkerRestartOnlyConnector/)
   [Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13562/23/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1548887516

   @ruslankrivoshein  Are you encountering the issue in kraft or zk mode, you can push you code firstly, then we can take a look. 🤝


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1197598893


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    @BeforeEach
+    public void setUp() {

Review Comment:
   Actually, it doesn't work even in `ApiVersionsRequestTest`. `adminClientProperties` in `clusterConfig` is not initialized there yet, so we can't reach the cluster. [this](https://github.com/apache/kafka/blob/6f197301646135e0bb39a461ca0a07c09c3185fb/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala#L145) line must be executed before.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1516312511

   Do you forgot to remove GetOffsetShell.scala and change `bin/kafka-get-offsets.sh` command?
   
   > KafkaServerTestHarness extends QuorumTestHarness, all of them are written in Scala
   If you are interested, you can change GetOffsetShellTest to use `ClusterTestExtensions`, you can refer to `MetadataQuorumCommandTest` as an example, but I think it's a arduous work.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1274685519


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   @ruslankrivoshein let me know when you have time to apply that patch, so that I can do the final review. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1280396762


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   @fvaleri when I add there null checking I face this situation. Should I really add it or this fail must be worked around?
   ```
   Verification failed: SpotBugs ended with exit code 1
   NP_NULL_ON_SOME_PATH: Possible null pointer dereference
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1280526285


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   @fvaleri ah, I'm sorry, I investigated it finally and worked around.
   What do you think about this approach?
   ```
   public static void validatePortOrExit(OptionParser parser, String hostPort) {
       if (parser == null || hostPort == null || hostPort.isEmpty()) {
           CommandLineUtils.printVersionAndExit();
           return;
       }
   }
   ```
   Should I use printUsageAndExit if parser will be null or what will be better to indicate an error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1633242202

   @fvaleri, I worked a bit after your comments and still have a few questions.
   First, tell me, please, how you prepare your environment to run `$ bin/kafka-get-offsets.sh`. Is there any guide or quickstart about it? I don't know yet, how to test my editions in that file correctly. Then I need to know, which options are considered to be deprecated. It's for improvement you suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1518054816

   Yes, since it's WIP and I have questions, I left original files.
   Well, I'll take a look at `ClusterTestExtensions` and will try to figure out more about testing there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1196474288


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    @BeforeEach
+    public void setUp() {

Review Comment:
   we should pass a `clusterConfig: ClusterConfig` parameter to setUp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1645507764

   Hey @ruslankrivoshein, about the deprecation warning message, it looks like it is considered a public interface change, so a committer may ask to remove it or create a KIP, so please remove it for now. Sorry, my fault.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1270733830


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   I know you just translated this to Java, but I think we can add null/empty checks for the input parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1711477537

   @dengziming, done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1285670300


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -99,4 +101,26 @@ public static void prettyPrintTable(
         printRow(columnLengths, headers, out);
         rows.forEach(row -> printRow(columnLengths, row, out));
     }
+
+    public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException {

Review Comment:
   Can we update `ReplicaVerificationTool` to use this method too?



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,

Review Comment:
   I think `options.topicOpt()` returns `null` if it's unset, so maybe we don't need to check `options.hasTopicOpt()`?



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,
+                    options.partitionsOpt()
+            );
+        }
+
+        Properties config;
+
+        if (options.hasCommandConfigOpt()) {
+            config = Utils.loadProps(options.commandConfigOpt());
+        } else {
+            config = new Properties();
+        }
+
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId);
+
+        try (Admin adminClient = Admin.create(config)) {
+            List<TopicPartition> partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics);
+
+            if (partitionInfos.isEmpty()) {
+                throw new TerseException("Could not match any topic-partitions with the specified filters");
+            }
+
+            Map<TopicPartition, OffsetSpec> timestampsToSearch = partitionInfos.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec));
+
+            ListOffsetsResult listOffsetsResult = adminClient.listOffsets(timestampsToSearch);
+
+            TreeMap<TopicPartition, Long> partitionOffsets = new TreeMap<>(Comparator.comparing(TopicPartition::toString));
+
+            for (TopicPartition partition : partitionInfos) {
+                ListOffsetsResultInfo partitionInfo;
+
+                try {
+                    partitionInfo = listOffsetsResult.partitionResult(partition).get();
+                } catch (KafkaException e) {

Review Comment:
   I don't think this throws `KafkaException`. This throws `ExecutionException` and you need to check the cause exception instead.



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,
+                    options.partitionsOpt()
+            );
+        }
+
+        Properties config;
+

Review Comment:
   Can we remove this empty line? 
   Also we could use a ternary operator here to simplify the code a bit



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,
+                    options.partitionsOpt()
+            );
+        }
+
+        Properties config;
+
+        if (options.hasCommandConfigOpt()) {
+            config = Utils.loadProps(options.commandConfigOpt());
+        } else {
+            config = new Properties();
+        }
+
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId);
+
+        try (Admin adminClient = Admin.create(config)) {
+            List<TopicPartition> partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics);
+
+            if (partitionInfos.isEmpty()) {
+                throw new TerseException("Could not match any topic-partitions with the specified filters");
+            }
+
+            Map<TopicPartition, OffsetSpec> timestampsToSearch = partitionInfos.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec));
+
+            ListOffsetsResult listOffsetsResult = adminClient.listOffsets(timestampsToSearch);
+
+            TreeMap<TopicPartition, Long> partitionOffsets = new TreeMap<>(Comparator.comparing(TopicPartition::toString));
+
+            for (TopicPartition partition : partitionInfos) {
+                ListOffsetsResultInfo partitionInfo;
+
+                try {
+                    partitionInfo = listOffsetsResult.partitionResult(partition).get();
+                } catch (KafkaException e) {
+                    System.err.println("Skip getting offsets for topic-partition " + partition.toString() + " due to error: " + e.getMessage());
+
+                    continue;
+                } catch (InterruptedException | ExecutionException ignored) {
+                    continue;

Review Comment:
   In the original logic, we threw `e` here. Why are we ignoring it now?



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,
+                    options.partitionsOpt()
+            );
+        }
+
+        Properties config;
+
+        if (options.hasCommandConfigOpt()) {
+            config = Utils.loadProps(options.commandConfigOpt());
+        } else {
+            config = new Properties();
+        }
+
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId);
+
+        try (Admin adminClient = Admin.create(config)) {
+            List<TopicPartition> partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics);
+
+            if (partitionInfos.isEmpty()) {
+                throw new TerseException("Could not match any topic-partitions with the specified filters");

Review Comment:
   We used to throw `IllegalArgumentException`. Why are we changing it?



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,
+                    options.partitionsOpt()
+            );
+        }
+
+        Properties config;
+
+        if (options.hasCommandConfigOpt()) {
+            config = Utils.loadProps(options.commandConfigOpt());
+        } else {
+            config = new Properties();
+        }
+
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId);
+
+        try (Admin adminClient = Admin.create(config)) {
+            List<TopicPartition> partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics);
+
+            if (partitionInfos.isEmpty()) {
+                throw new TerseException("Could not match any topic-partitions with the specified filters");
+            }
+
+            Map<TopicPartition, OffsetSpec> timestampsToSearch = partitionInfos.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec));
+
+            ListOffsetsResult listOffsetsResult = adminClient.listOffsets(timestampsToSearch);
+
+            TreeMap<TopicPartition, Long> partitionOffsets = new TreeMap<>(Comparator.comparing(TopicPartition::toString));
+
+            for (TopicPartition partition : partitionInfos) {
+                ListOffsetsResultInfo partitionInfo;
+
+                try {
+                    partitionInfo = listOffsetsResult.partitionResult(partition).get();
+                } catch (KafkaException e) {
+                    System.err.println("Skip getting offsets for topic-partition " + partition.toString() + " due to error: " + e.getMessage());
+
+                    continue;
+                } catch (InterruptedException | ExecutionException ignored) {
+                    continue;
+                }
+
+                if (partitionInfo.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
+                    partitionOffsets.put(partition, partitionInfo.offset());
+                }
+            }
+
+            return partitionOffsets;
+        }
+    }
+
+    private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
+        switch (listOffsetsTimestamp) {
+            case "earliest":
+                return OffsetSpec.earliest();
+            case "latest":
+                return OffsetSpec.latest();
+            case "max-timestamp":
+                return OffsetSpec.maxTimestamp();
+            default:
+                long timestamp;
+
+                try {
+                    timestamp = Long.parseLong(listOffsetsTimestamp);
+                } catch (NumberFormatException e) {
+                    throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " +
+                            "Please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp");
+                }
+
+                if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+                    return OffsetSpec.earliest();
+                } else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
+                    return OffsetSpec.latest();
+                } else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+                    return OffsetSpec.maxTimestamp();
+                } else {
+                    return OffsetSpec.forTimestamp(timestamp);
+                }
+        }
+    }
+
+    /**
+     * Creates a topic-partition filter based on a list of patterns.
+     * Expected format:
+     * List: TopicPartitionPattern(, TopicPartitionPattern)*
+     * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern
+     * TopicPattern: REGEX
+     * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+     */
+    public TopicPartitionFilter createTopicPartitionFilterWithPatternList(String topicPartitions) {
+        List<String> ruleSpecs = Arrays.asList(topicPartitions.split(","));
+        List<TopicPartitionFilter> rules = ruleSpecs.stream().map(ruleSpec -> {
+            try {
+                return parseRuleSpec(ruleSpec);
+            } catch (TerseException e) {
+                throw new RuntimeException(e);
+            }
+        }).collect(Collectors.toList());
+
+        return new CompositeTopicPartitionFilter(rules);
+    }
+
+    /**
+     * Creates a topic-partition filter based on a topic pattern and a set of partition ids.
+     */
+    public TopicPartitionFilter createTopicPartitionFilterWithTopicAndPartitionPattern(String topicOpt, String partitionIds) throws TerseException {
+        return new TopicFilterAndPartitionFilter(
+                new IncludeList(topicOpt != null ? topicOpt : ".*"),
+                new PartitionsSetFilter(createPartitionSet(partitionIds))
+        );
+    }
+
+    private Set<Integer> createPartitionSet(String partitionsString) throws TerseException {
+        Set<Integer> partitions;
+
+        if (partitionsString == null || partitionsString.isEmpty()) {
+            partitions = Collections.emptySet();
+        } else {
+            try {
+                partitions = Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());
+            } catch (NumberFormatException e) {
+                throw new TerseException("--partitions expects a comma separated list of numeric " +
+                        "partition ids, but received: " + partitionsString);
+            }
+        }
+
+        return partitions;
+    }
+
+    /**
+     * Return the partition infos. Filter them with topicPartitionFilter.
+     */
+    private List<TopicPartition> listPartitionInfos(
+            Admin client,
+            TopicPartitionFilter topicPartitionFilter,
+            boolean excludeInternalTopics
+    ) throws ExecutionException, InterruptedException {
+        ListTopicsOptions listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics);
+        Set<String> topics = client.listTopics(listTopicsOptions).names().get();
+        Set<String> filteredTopics = topics.stream().filter(topicPartitionFilter::isTopicAllowed).collect(Collectors.toSet());
+
+        return client.describeTopics(filteredTopics).allTopicNames().get().entrySet().stream().flatMap(
+                topic -> topic.getValue().partitions().stream().map(
+                        tp -> new TopicPartition(topic.getKey(), tp.partition())
+                ).filter(topicPartitionFilter::isTopicPartitionAllowed)
+        ).collect(Collectors.toList());
+    }
+
+    private TopicPartitionFilter parseRuleSpec(String ruleSpec) throws TerseException, RuntimeException {
+        Matcher matcher = TOPIC_PARTITION_PATTERN.matcher(ruleSpec);
+
+        if (!matcher.matches())
+            throw new TerseException("Invalid rule specification: " + ruleSpec);

Review Comment:
   We used to throw `IllegalArgumentException` why are we using `TerseException` now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1665982738

   @fvaleri I summon thee. Please, take one more look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1259668871


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        getOffsetShell.parseArgs(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private OptionSet options;
+    private OptionSpec<String> topicPartitionsOpt;

Review Comment:
   Good point, thank you.
   Speaking about JmxTool, more precisely about JmxToolTest. There is [execute](https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java#L354) method that just duplicates [this](https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java#L33), I don't know why. Do you have any explanation about this your approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1271470073


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   So you also mean it should be just moved from kafka/tests/core to kafka/tests/tools?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1645561485

   ```
   $ ./gradlew :tools:checkstyleMain
   ... omitted husk
   BUILD SUCCESSFUL
   
   $ ./gradlew clean systemTestLibs
   ... omitted husk
   BUILD SUCCESSFUL
   tests run:        10
   passed:           10
   ```
   I hope it's done.
   
   Then I restored error messages and removed usage print. Actually, here I get also messages from SLF4J. Did you see anything like this before? Could you recommend any way to fix it locally?
   ```
   $ bin/kafka-get-offsets.sh --broker-list :9092 --topic __consumer_offsets --time -1 --exclude-internal-topics
   WARNING: The 'broker-list' option is deprecated and will be removed in the next major release. Use the `bootstrap-server` option with the same syntax.
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in [jar:file:/home/rkrivoshein/kafka/tools/build/dependant-libs-2.13.11/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in [jar:file:/home/rkrivoshein/kafka/trogdor/build/dependant-libs-2.13.11/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
   SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
   Error occurred: Could not match any topic-partitions with the specified filters
   ```
   @fvaleri, please, take a look.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1271258247


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   There are only a couple of FQCN used in tests/kafkatest/services/kafka/kafka.py, but we are only concerned about kafka-get-offsets, the other will be addressed in its own PR. The change is very simple in the end:
   
   ```sh
   diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
   index f116f92c3b..ab48d1091d 100644
   --- a/tests/kafkatest/services/kafka/kafka.py
   +++ b/tests/kafkatest/services/kafka/kafka.py
   @@ -1777,8 +1777,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
            node = self.nodes[0]
    
            cmd = fix_opts_for_new_jvm(node)
   -        cmd += self.path.script("kafka-run-class.sh", node)
   -        cmd += " org.apache.kafka.tools.GetOffsetShell"
   +        cmd += self.path.script("kafka-get-offsets.sh", node)
            cmd += " --bootstrap-server %s" % self.bootstrap_servers(self.security_protocol)
    
            if time:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1285005842


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -99,4 +101,26 @@ public static void prettyPrintTable(
         printRow(columnLengths, headers, out);
         rows.forEach(row -> printRow(columnLengths, row, out));
     }
+
+    public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException {
+        if (hostPort == null || hostPort.isEmpty()) {

Review Comment:
   Since Java 11 we can use `isBlank()` that also ignores white spaces. Unfortunately, we still need to provide support for Java 8, where we can achieve the same by doing `hostPort.trim().isEmpty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1280446002


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   I don't see the code, so I can't comment further, but this reveals a programming error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1659819950

   @ruslankrivoshein There are still conflicts in this PR, can you rebase it again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1286654127


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,
+                    options.partitionsOpt()
+            );
+        }
+
+        Properties config;
+
+        if (options.hasCommandConfigOpt()) {
+            config = Utils.loadProps(options.commandConfigOpt());
+        } else {
+            config = new Properties();
+        }
+
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId);
+
+        try (Admin adminClient = Admin.create(config)) {
+            List<TopicPartition> partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics);
+
+            if (partitionInfos.isEmpty()) {
+                throw new TerseException("Could not match any topic-partitions with the specified filters");
+            }
+
+            Map<TopicPartition, OffsetSpec> timestampsToSearch = partitionInfos.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec));
+
+            ListOffsetsResult listOffsetsResult = adminClient.listOffsets(timestampsToSearch);
+
+            TreeMap<TopicPartition, Long> partitionOffsets = new TreeMap<>(Comparator.comparing(TopicPartition::toString));
+
+            for (TopicPartition partition : partitionInfos) {
+                ListOffsetsResultInfo partitionInfo;
+
+                try {
+                    partitionInfo = listOffsetsResult.partitionResult(partition).get();
+                } catch (KafkaException e) {
+                    System.err.println("Skip getting offsets for topic-partition " + partition.toString() + " due to error: " + e.getMessage());
+
+                    continue;
+                } catch (InterruptedException | ExecutionException ignored) {
+                    continue;
+                }
+
+                if (partitionInfo.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
+                    partitionOffsets.put(partition, partitionInfo.offset());
+                }
+            }
+
+            return partitionOffsets;
+        }
+    }
+
+    private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
+        switch (listOffsetsTimestamp) {
+            case "earliest":
+                return OffsetSpec.earliest();
+            case "latest":
+                return OffsetSpec.latest();
+            case "max-timestamp":
+                return OffsetSpec.maxTimestamp();
+            default:
+                long timestamp;
+
+                try {
+                    timestamp = Long.parseLong(listOffsetsTimestamp);
+                } catch (NumberFormatException e) {
+                    throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " +
+                            "Please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp");
+                }
+
+                if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+                    return OffsetSpec.earliest();
+                } else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
+                    return OffsetSpec.latest();
+                } else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+                    return OffsetSpec.maxTimestamp();
+                } else {
+                    return OffsetSpec.forTimestamp(timestamp);
+                }
+        }
+    }
+
+    /**
+     * Creates a topic-partition filter based on a list of patterns.
+     * Expected format:
+     * List: TopicPartitionPattern(, TopicPartitionPattern)*
+     * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern
+     * TopicPattern: REGEX
+     * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+     */
+    public TopicPartitionFilter createTopicPartitionFilterWithPatternList(String topicPartitions) {
+        List<String> ruleSpecs = Arrays.asList(topicPartitions.split(","));
+        List<TopicPartitionFilter> rules = ruleSpecs.stream().map(ruleSpec -> {
+            try {
+                return parseRuleSpec(ruleSpec);
+            } catch (TerseException e) {
+                throw new RuntimeException(e);
+            }
+        }).collect(Collectors.toList());
+
+        return new CompositeTopicPartitionFilter(rules);
+    }
+
+    /**
+     * Creates a topic-partition filter based on a topic pattern and a set of partition ids.
+     */
+    public TopicPartitionFilter createTopicPartitionFilterWithTopicAndPartitionPattern(String topicOpt, String partitionIds) throws TerseException {
+        return new TopicFilterAndPartitionFilter(
+                new IncludeList(topicOpt != null ? topicOpt : ".*"),
+                new PartitionsSetFilter(createPartitionSet(partitionIds))
+        );
+    }
+
+    private Set<Integer> createPartitionSet(String partitionsString) throws TerseException {
+        Set<Integer> partitions;
+
+        if (partitionsString == null || partitionsString.isEmpty()) {
+            partitions = Collections.emptySet();
+        } else {
+            try {
+                partitions = Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());
+            } catch (NumberFormatException e) {
+                throw new TerseException("--partitions expects a comma separated list of numeric " +
+                        "partition ids, but received: " + partitionsString);
+            }
+        }
+
+        return partitions;
+    }
+
+    /**
+     * Return the partition infos. Filter them with topicPartitionFilter.
+     */
+    private List<TopicPartition> listPartitionInfos(
+            Admin client,
+            TopicPartitionFilter topicPartitionFilter,
+            boolean excludeInternalTopics
+    ) throws ExecutionException, InterruptedException {
+        ListTopicsOptions listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics);
+        Set<String> topics = client.listTopics(listTopicsOptions).names().get();
+        Set<String> filteredTopics = topics.stream().filter(topicPartitionFilter::isTopicAllowed).collect(Collectors.toSet());
+
+        return client.describeTopics(filteredTopics).allTopicNames().get().entrySet().stream().flatMap(
+                topic -> topic.getValue().partitions().stream().map(
+                        tp -> new TopicPartition(topic.getKey(), tp.partition())
+                ).filter(topicPartitionFilter::isTopicPartitionAllowed)
+        ).collect(Collectors.toList());
+    }
+
+    private TopicPartitionFilter parseRuleSpec(String ruleSpec) throws TerseException, RuntimeException {
+        Matcher matcher = TOPIC_PARTITION_PATTERN.matcher(ruleSpec);
+
+        if (!matcher.matches())
+            throw new TerseException("Invalid rule specification: " + ruleSpec);

Review Comment:
   ... and there was [a reason for that](https://github.com/apache/kafka/pull/13562#pullrequestreview-1518389971). The old code was catching these exceptions and printing the error message. Instead, your code was printing the error message plus an ugly stack trace. That's why I suggested to use TerseException, as we have in other tools.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1271470073


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   Should it also be moved from kafka/tests/core to kafka/tests/tools?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1270873057


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   And what should it do when null?
   Could you also tell me more about improvement? I can't find FQCN in my implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1662850171

   @dengziming could you tell me, what is a good approach for rebase here? When I do this, I face conflicts. After resolving I do push, and there are dozens of commits in PR, so I do merge. Or merge is also appropriate?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1287993847


##########
core/src/main/scala/kafka/tools/GetOffsetShell.scala:
##########


Review Comment:
   > We should still keep this file can forward all args to new GetOffsetShell, see `FeatureCommand.scala`.
   
   Hi @dengziming, this is tool was reported in the "missing wrapper script" category in [KIP-906](https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines), but that was only because some system tests are depending on the FQCN. In this PR we are also changing that, and they are now using the wrapper script instead, so I think there is no need for the redirection.
   



##########
core/src/main/scala/kafka/tools/GetOffsetShell.scala:
##########


Review Comment:
   Hi @dengziming, this is tool was reported in the "missing wrapper script" category in [KIP-906](https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines), but that was only because some system tests are depending on the FQCN. In this PR we are also changing that, and they are now using the wrapper script instead, so I think there is no need for the redirection.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1548242140

   @dengziming, could you help me in this situation?
   I use `ClusterTestExtensions`, but it creates "__consumers_offset " with 5 partitions. This number doesn't fit in this tests, and I can't find a way, how to manipulate this. When I delete this topic manually, it doesn't appear. When I delete it and create with specified number of partitions, it appears, but with 5 partitions again.
   What could I do here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1634715273

   Yes, but just use `System.out.println` without adding extra classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1634674221

   @fvaleri, will be this way good?
   ```
   $ bin/kafka-get-offsets.sh --broker-list :9092 --topic my-top --time 3
   [2023-07-13 19:58:25,436] WARN The 'broker-list' option is deprecated and will be removed in the next major release. Use the `bootstrap-server` option with the same syntax. (org.apache.kafka.tools.GetOffsetShell)
   my-top:0:0
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1230929224


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        getOffsetShell.parseArgs(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private OptionSet options;
+    private OptionSpec<String> topicPartitionsOpt;
+    private OptionSpec<String> topicOpt;
+    private OptionSpec<String> partitionsOpt;
+    private OptionSpec<String> timeOpt;
+    private OptionSpec<String> commandConfigOpt;
+    private OptionSpec<String> effectiveBrokerListOpt;
+    private OptionSpecBuilder excludeInternalTopicsOpt;
+
+    public void parseArgs(final String[] args) {
+        final OptionParser parser = new OptionParser(false);
+
+        OptionSpec<String> brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        OptionSpec<String> bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .requiredUnless("broker-list")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + topicPartitionPattern + "'." +
+                        " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                        " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                .withRequiredArg()
+                .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                .ofType(String.class);
+        topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("partition ids")
+                .ofType(String.class);
+        timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                .withRequiredArg()
+                .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                .ofType(String.class)
+                .defaultsTo("latest");
+        commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+        if (args.length == 0) {
+            CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+        }
+
+        options = parser.parse(args);
+
+        if (options.has(bootstrapServerOpt)) {
+            effectiveBrokerListOpt = bootstrapServerOpt;
+        } else {
+            effectiveBrokerListOpt = brokerListOpt;
+        }
+
+        CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+        String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+        ToolsUtils.validatePortOrDie(parser, brokerList);
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets() throws IOException, ExecutionException, InterruptedException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+        if (options.has(topicPartitionsOpt) && (options.has(topicOpt) || options.has(partitionsOpt))) {
+            throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.has(excludeInternalTopicsOpt);
+        OffsetSpec offsetSpec = parseOffsetSpec(options.valueOf(timeOpt));
+
+        TopicPartitionFilter topicPartitionFilter;

Review Comment:
   I didn't find this class, did you forgot to add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1711304414

   @ruslankrivoshein Maybe you didn't get my idea, I mean the `WIP` here:
   ![image](https://github.com/apache/kafka/assets/26023240/37051385-4e10-4d67-bfe1-d72a3913e70a)
   
   do you get it? 👀


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1633948707

   >  how you prepare your environment to run $ bin/kafka-get-offsets.sh. Is there any guide or quickstart about it?
   
   From Kafka project home dir, build with `./gradlew jar`, then start a local Kafka cluster as explained in the [official quickstart](https://kafka.apache.org/quickstart), and finally run `bin/kafka-get-offsets.sh` as I show in my previous comment.
   
   > which options are considered to be deprecated
   
   For this tool, search for deprecated in the class and you'll find `broker-list`. It would be good to log a warning when a user try to use it instead of `bootstrap-server`. I was thinking to something like `WARNING: The 'broker-list' option is deprecated and will be removed in the next major release. Use the `bootstrap-server` option with the same syntax.`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1639869186

   Thanks @fvaleri and sorry for the really long delay from my side on the other PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1639858809

   Just wanted to let you know that TopicFilter and related classes have been merged, so you can rebase this PR from trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1635504373

   Hi @ruslankrivoshein, thanks for adding the warning message.
   
   Not sure if this is ready for another review, but I still see some issues.
   
   The checkstyle phase is still failing, I suggested a way to fix it in one of my previous comments.
   
   I don't see the stacktrace anymore in case of wrong options, but the error messages are not exactly the same as before. In general, we should match the old behavior when reporting errors.
   
   ```sh
   # before
   $ bin/kafka-get-offsets.sh --topic my-topic --time -3
   Error occurred: Missing required option(s) [bootstrap-server]
   
   # now
   $ bin/kafka-get-offsets.sh --topic my-topic --time -3
   Missing required option(s) [bootstrap-server]
   Option                                   Description                           
   ------                                   -----------                           
   --bootstrap-server <String: HOST1:       REQUIRED. The server(s) to connect to 
     PORT1,...,HOST3:PORT3>                   in the form HOST1:PORT1,HOST2:PORT2.
   --broker-list <String: HOST1:PORT1,...,  DEPRECATED, use --bootstrap-server    
     HOST3:PORT3>                             instead; ignored if --bootstrap-    
                                              server is specified. The server(s)  
                                              to connect to in the form HOST1:    
                                              PORT1,HOST2:PORT2. 
   ...
   
   # before
   $ bin/kafka-get-offsets.sh --broker-list :9092 --topic __consumer_offsets --time -1 --exclude-internal-topics
   Error occurred: Could not match any topic-partitions with the specified filters
   
   # now
   $ bin/kafka-get-offsets.sh --broker-list :9092 --topic __consumer_offsets --time -1 --exclude-internal-topics
   Could not match any topic-partitions with the specified filters
   ```
   
   Finally, the `tests/kafkatest/tests/core/get_offset_shell_test.py` system test fails because you need to also update the GetOffsetShell package in `tests/kafkatest/services/kafka/kafka.py`. You can run this test like this:
   
   ```sh
   ./gradlew clean systemTestLibs
   TC_PATHS="tests/kafkatest/tests/core/get_offset_shell_test.py" bash tests/docker/run_tests.sh
   ```
   
   Hope it helps. 
   Let me know when you are ready for another round of review.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1270907553


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   Do you have ideas about how it should look like? I see there 3 FQCN. What's the way to put this names in that script?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1271787804


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   No, that's a service class that includes multiple tools.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1271470073


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   Ah, so you mean it should be just moved from kafka/tests/core to kafka/tests/tools?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1270884317


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   > And what should it do when null?
   
   The parser can be null right? We don't want to throw a NPE to the user. You can print an error in that case and exit.
   
   The hostPort can be null or empty. Same story.
   
   > Could you also tell me more about improvement? I can't find FQCN in my implementation. And how it must be corrected?
   
   It's in tests/kafkatest/services/kafka/kafka.py, where you already changed the package name. My suggestion is to use the wrapper script, instead of the FQCN. That way, if the package name needs to be changed again in the future, we won't need to touch the system test.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1197583887


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    @BeforeEach
+    public void setUp() {

Review Comment:
   Yes, we can get cluster variable, but In this approach the `adminClientProperties` in `ClusterConfig` isn't initialized yet. I can't figure out, where this gets initialized...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1197869315


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    @BeforeEach
+    public void setUp() {

Review Comment:
   Yes, we can't create topic in Setup since `setup` is invoked before we init a cluster, we can only overwrite some config to control the action of it, for exmaple:
   ```
       clusterConfig.serverProperties().put("auto.create.topics.enable", false);
       clusterConfig.serverProperties().put("offsets.topic.replication.factor", "1");
       clusterConfig.serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount));
   ```
   with these configs, you can create __consumers_offset as you want, if you really want to control when to create it, you should overwrite `setUp` in `ZkClusterInvocationContext.getAdditionalExtensions.IntegrationTestHarness` and change `doSetup(testInfo, createOffsetsTopic = true)` to `doSetup(testInfo, createOffsetsTopic = false)`, but I think it's unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1197583887


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    @BeforeEach
+    public void setUp() {

Review Comment:
   Yes, we can get cluster variable, but In this approach the `adminClientProperties` in `ClusterConfig` isn't initialized yet, as in `ApiVersionsRequestTest`. I can't figure out, where this gets initialized...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1216997701


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    public void setUp() {
+        cluster.config().serverProperties().put("auto.create.topics.enable", false);
+        cluster.config().serverProperties().put("offsets.topic.replication.factor", "1");
+        cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount));
+
+        try (Admin admin = Admin.create(cluster.config().adminClientProperties())) {
+            List<NewTopic> topics = new ArrayList<>();
+
+            IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short) 1)));
+
+            admin.createTopics(topics);
+        }
+
+        Properties props = new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers"));
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+            IntStream.range(0, topicCount + 1)
+                .forEach(i -> IntStream.range(0, i * i)
+                        .forEach(msgCount -> producer.send(
+                                new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)))
+                );
+        }
+    }
+
+    static class Row {
+        private String name;
+        private int partition;
+        private Long timestamp;
+
+        public Row(String name, int partition, Long timestamp) {
+            this.name = name;
+            this.partition = partition;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == this) return true;
+
+            if (!(o instanceof Row)) return false;
+
+            Row r = (Row) o;
+
+            return name.equals(r.name) && partition == r.partition && Objects.equals(timestamp, r.timestamp);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, partition, timestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testNoFilterOptions() {
+        setUp();
+
+        List<Row> output = executeAndParse();
+
+        assertEquals(expectedOffsetsWithInternal(), output);
+    }
+
+    @ClusterTest
+    public void testInternalExcluded() {
+        setUp();
+
+        List<Row> output = executeAndParse("--exclude-internal-topics");
+
+        assertEquals(expectedTestTopicOffsets(), output);
+    }
+
+    @ClusterTest
+    public void testTopicNameArg() {
+        setUp();
+
+        IntStream.range(1, topicCount + 1).forEach(i -> {
+            List<Row> offsets = executeAndParse("--topic", getTopicName(i));
+
+            assertEquals(expectedOffsetsForTopic(i), offsets, () -> "Offset output did not match for " + getTopicName(i));
+        });
+    }
+
+    @ClusterTest
+    public void testTopicPatternArg() {
+        setUp();
+
+        List<Row> offsets = executeAndParse("--topic", "topic.*");
+
+        assertEquals(expectedTestTopicOffsets(), offsets);
+    }
+
+    @ClusterTest
+    public void testPartitionsArg() {
+        setUp();
+
+        List<Row> offsets = executeAndParse("--partitions", "0,1");
+
+        assertEquals(expectedOffsetsWithInternal().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);
+    }
+
+    @ClusterTest
+    public void testTopicPatternArgWithPartitionsArg() {
+        setUp();
+
+        List<Row> offsets = executeAndParse("--topic", "topic.*", "--partitions", "0,1");
+
+        assertEquals(expectedTestTopicOffsets().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);
+    }
+
+    @ClusterTest
+    public void testTopicPartitionsArg() {
+        setUp();
+
+        List<Row> offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3");
+        List<Row> expected = Arrays.asList(
+                new Row("__consumer_offsets", 3, 0L),
+                new Row("topic1", 0, 1L),
+                new Row("topic2", 1, 2L),
+                new Row("topic3", 2, 3L),
+                new Row("topic4", 2, 4L)
+        );
+
+        assertEquals(expected, offsets);
+    }
+
+    @ClusterTest
+    public void testGetLatestOffsets() {
+        setUp();
+
+        for (String time : new String[] {"-1", "latest"}) {

Review Comment:
   I also didn't find a way how to use `ParametrizedTest` and `ValueSource` in the same place with `ClusterTest`. It throws error about `ParameterResolver`, so I did this approach.



##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class GetOffsetShellTest {
+    private final int topicCount = 4;
+    private final int offsetTopicPartitionCount = 4;
+    private final ClusterInstance cluster;
+
+    public GetOffsetShellTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private String getTopicName(int i) {
+        return "topic" + i;
+    }
+
+    public void setUp() {
+        cluster.config().serverProperties().put("auto.create.topics.enable", false);
+        cluster.config().serverProperties().put("offsets.topic.replication.factor", "1");
+        cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount));
+
+        try (Admin admin = Admin.create(cluster.config().adminClientProperties())) {
+            List<NewTopic> topics = new ArrayList<>();
+
+            IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short) 1)));
+
+            admin.createTopics(topics);
+        }
+
+        Properties props = new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers"));
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+            IntStream.range(0, topicCount + 1)
+                .forEach(i -> IntStream.range(0, i * i)
+                        .forEach(msgCount -> producer.send(
+                                new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)))
+                );
+        }
+    }
+
+    static class Row {
+        private String name;
+        private int partition;
+        private Long timestamp;
+
+        public Row(String name, int partition, Long timestamp) {
+            this.name = name;
+            this.partition = partition;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == this) return true;
+
+            if (!(o instanceof Row)) return false;
+
+            Row r = (Row) o;
+
+            return name.equals(r.name) && partition == r.partition && Objects.equals(timestamp, r.timestamp);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, partition, timestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testNoFilterOptions() {
+        setUp();
+
+        List<Row> output = executeAndParse();
+
+        assertEquals(expectedOffsetsWithInternal(), output);
+    }
+
+    @ClusterTest
+    public void testInternalExcluded() {
+        setUp();

Review Comment:
   This thing confuses me, but I couldn't find any workaround



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1286837845


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error occurred: " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private static class GetOffsetShellOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicPartitionsOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> partitionsOpt;
+        private final OptionSpec<String> timeOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> effectiveBrokerListOpt;
+        private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+        public GetOffsetShellOptions(String[] args) throws TerseException {
+            super(args);
+
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                    .ofType(String.class);
+            topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
+                            " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                            " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                    .withRequiredArg()
+                    .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                    .withRequiredArg()
+                    .describedAs("partition ids")
+                    .ofType(String.class);
+            timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                    .withRequiredArg()
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                    .ofType(String.class)
+                    .defaultsTo("latest");
+            commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+            if (args.length == 0) {
+                CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+            }
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                throw new TerseException(e.getMessage());
+            }
+
+            if (options.has(bootstrapServerOpt)) {
+                effectiveBrokerListOpt = bootstrapServerOpt;
+            } else {
+                effectiveBrokerListOpt = brokerListOpt;
+            }
+
+            CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+            String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+            try {
+                ToolsUtils.validateBootstrapServer(brokerList);
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasTopicPartitionsOpt() {
+            return options.has(topicPartitionsOpt);
+        }
+
+        public String topicPartitionsOpt() {
+            return options.valueOf(topicPartitionsOpt);
+        }
+
+        public boolean hasTopicOpt() {
+            return options.has(topicOpt);
+        }
+
+        public String topicOpt() {
+            return options.valueOf(topicOpt);
+        }
+
+        public boolean hasPartitionsOpt() {
+            return options.has(partitionsOpt);
+        }
+
+        public String partitionsOpt() {
+            return options.valueOf(partitionsOpt);
+        }
+
+        public String timeOpt() {
+            return options.valueOf(timeOpt);
+        }
+
+        public boolean hasCommandConfigOpt() {
+            return options.has(commandConfigOpt);
+        }
+
+        public String commandConfigOpt() {
+            return options.valueOf(commandConfigOpt);
+        }
+
+        public String effectiveBrokerListOpt() {
+            return options.valueOf(effectiveBrokerListOpt);
+        }
+
+        public boolean hasExcludeInternalTopicsOpt() {
+            return options.has(excludeInternalTopicsOpt);
+        }
+    }
+
+    public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
+        String clientId = "GetOffsetShell";
+        String brokerList = options.effectiveBrokerListOpt();
+
+        if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
+            throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
+        }
+
+        boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
+        OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
+
+        TopicPartitionFilter topicPartitionFilter;
+
+        if (options.hasTopicPartitionsOpt()) {
+            topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
+        } else {
+            topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(
+                    options.hasTopicOpt() ? options.topicOpt() : null,
+                    options.partitionsOpt()
+            );
+        }
+
+        Properties config;
+
+        if (options.hasCommandConfigOpt()) {
+            config = Utils.loadProps(options.commandConfigOpt());
+        } else {
+            config = new Properties();
+        }
+
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId);
+
+        try (Admin adminClient = Admin.create(config)) {
+            List<TopicPartition> partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics);
+
+            if (partitionInfos.isEmpty()) {
+                throw new TerseException("Could not match any topic-partitions with the specified filters");
+            }
+
+            Map<TopicPartition, OffsetSpec> timestampsToSearch = partitionInfos.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec));
+
+            ListOffsetsResult listOffsetsResult = adminClient.listOffsets(timestampsToSearch);
+
+            TreeMap<TopicPartition, Long> partitionOffsets = new TreeMap<>(Comparator.comparing(TopicPartition::toString));
+
+            for (TopicPartition partition : partitionInfos) {
+                ListOffsetsResultInfo partitionInfo;
+
+                try {
+                    partitionInfo = listOffsetsResult.partitionResult(partition).get();
+                } catch (KafkaException e) {
+                    System.err.println("Skip getting offsets for topic-partition " + partition.toString() + " due to error: " + e.getMessage());
+
+                    continue;
+                } catch (InterruptedException | ExecutionException ignored) {
+                    continue;
+                }
+
+                if (partitionInfo.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
+                    partitionOffsets.put(partition, partitionInfo.offset());
+                }
+            }
+
+            return partitionOffsets;
+        }
+    }
+
+    private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
+        switch (listOffsetsTimestamp) {
+            case "earliest":
+                return OffsetSpec.earliest();
+            case "latest":
+                return OffsetSpec.latest();
+            case "max-timestamp":
+                return OffsetSpec.maxTimestamp();
+            default:
+                long timestamp;
+
+                try {
+                    timestamp = Long.parseLong(listOffsetsTimestamp);
+                } catch (NumberFormatException e) {
+                    throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " +
+                            "Please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp");
+                }
+
+                if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+                    return OffsetSpec.earliest();
+                } else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
+                    return OffsetSpec.latest();
+                } else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+                    return OffsetSpec.maxTimestamp();
+                } else {
+                    return OffsetSpec.forTimestamp(timestamp);
+                }
+        }
+    }
+
+    /**
+     * Creates a topic-partition filter based on a list of patterns.
+     * Expected format:
+     * List: TopicPartitionPattern(, TopicPartitionPattern)*
+     * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern
+     * TopicPattern: REGEX
+     * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+     */
+    public TopicPartitionFilter createTopicPartitionFilterWithPatternList(String topicPartitions) {
+        List<String> ruleSpecs = Arrays.asList(topicPartitions.split(","));
+        List<TopicPartitionFilter> rules = ruleSpecs.stream().map(ruleSpec -> {
+            try {
+                return parseRuleSpec(ruleSpec);
+            } catch (TerseException e) {
+                throw new RuntimeException(e);
+            }
+        }).collect(Collectors.toList());
+
+        return new CompositeTopicPartitionFilter(rules);
+    }
+
+    /**
+     * Creates a topic-partition filter based on a topic pattern and a set of partition ids.
+     */
+    public TopicPartitionFilter createTopicPartitionFilterWithTopicAndPartitionPattern(String topicOpt, String partitionIds) throws TerseException {
+        return new TopicFilterAndPartitionFilter(
+                new IncludeList(topicOpt != null ? topicOpt : ".*"),
+                new PartitionsSetFilter(createPartitionSet(partitionIds))
+        );
+    }
+
+    private Set<Integer> createPartitionSet(String partitionsString) throws TerseException {
+        Set<Integer> partitions;
+
+        if (partitionsString == null || partitionsString.isEmpty()) {
+            partitions = Collections.emptySet();
+        } else {
+            try {
+                partitions = Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());
+            } catch (NumberFormatException e) {
+                throw new TerseException("--partitions expects a comma separated list of numeric " +
+                        "partition ids, but received: " + partitionsString);
+            }
+        }
+
+        return partitions;
+    }
+
+    /**
+     * Return the partition infos. Filter them with topicPartitionFilter.
+     */
+    private List<TopicPartition> listPartitionInfos(
+            Admin client,
+            TopicPartitionFilter topicPartitionFilter,
+            boolean excludeInternalTopics
+    ) throws ExecutionException, InterruptedException {
+        ListTopicsOptions listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics);
+        Set<String> topics = client.listTopics(listTopicsOptions).names().get();
+        Set<String> filteredTopics = topics.stream().filter(topicPartitionFilter::isTopicAllowed).collect(Collectors.toSet());
+
+        return client.describeTopics(filteredTopics).allTopicNames().get().entrySet().stream().flatMap(
+                topic -> topic.getValue().partitions().stream().map(
+                        tp -> new TopicPartition(topic.getKey(), tp.partition())
+                ).filter(topicPartitionFilter::isTopicPartitionAllowed)
+        ).collect(Collectors.toList());
+    }
+
+    private TopicPartitionFilter parseRuleSpec(String ruleSpec) throws TerseException, RuntimeException {
+        Matcher matcher = TOPIC_PARTITION_PATTERN.matcher(ruleSpec);
+
+        if (!matcher.matches())
+            throw new TerseException("Invalid rule specification: " + ruleSpec);

Review Comment:
   Fair enough, thanks for the clarification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1679474019

   @mimaison please, take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1711292569

   @ruslankrivoshein I mean that you should update the description of this PR, and clarify what you have changed in this PR, then we can merge this if CI pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1255500374


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");

Review Comment:
   I would make this private as in the original code.



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        getOffsetShell.parseArgs(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private OptionSet options;
+    private OptionSpec<String> topicPartitionsOpt;
+    private OptionSpec<String> topicOpt;
+    private OptionSpec<String> partitionsOpt;
+    private OptionSpec<String> timeOpt;
+    private OptionSpec<String> commandConfigOpt;
+    private OptionSpec<String> effectiveBrokerListOpt;
+    private OptionSpecBuilder excludeInternalTopicsOpt;
+
+    public void parseArgs(final String[] args) {
+        final OptionParser parser = new OptionParser(false);
+
+        OptionSpec<String> brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        OptionSpec<String> bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                .requiredUnless("broker-list")
+                .withRequiredArg()
+                .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                .ofType(String.class);
+        topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + topicPartitionPattern + "'." +
+                        " The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
+                        " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
+                .withRequiredArg()
+                .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+                .ofType(String.class);
+        topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
+                .withRequiredArg()
+                .describedAs("partition ids")
+                .ofType(String.class);
+        timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
+                .withRequiredArg()
+                .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
+                .ofType(String.class)
+                .defaultsTo("latest");
+        commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
+
+        if (args.length == 0) {
+            CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
+        }
+
+        options = parser.parse(args);
+
+        if (options.has(bootstrapServerOpt)) {
+            effectiveBrokerListOpt = bootstrapServerOpt;
+        } else {
+            effectiveBrokerListOpt = brokerListOpt;
+        }
+
+        CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
+
+        String brokerList = options.valueOf(effectiveBrokerListOpt);
+
+        ToolsUtils.validatePortOrDie(parser, brokerList);

Review Comment:
   In order to be consistent with other tools, I would call it `validatePortOrExit` and move it inside `CommandLineUtils`, deleting `ToolsUtils` class.



##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        getOffsetShell.parseArgs(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private OptionSet options;
+    private OptionSpec<String> topicPartitionsOpt;

Review Comment:
   Why not embed all the options in a private static class that can also hosts various utility methods on them? You can look at the new `JmxTool` for an example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1259763311


##########
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+    Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws IOException, ExecutionException, InterruptedException {
+        GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+        getOffsetShell.parseArgs(args);
+
+        Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            TopicPartition topic = entry.getKey();
+
+            System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
+        }
+    }
+
+    private OptionSet options;
+    private OptionSpec<String> topicPartitionsOpt;

Review Comment:
   Well, that's not exactly a duplicate, as mine also creates the JmxTool runnable for convenience. For the last part, I guess I didn't want to break ToolsTestUtils visibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1280546556


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   Maybe validateBootstrapServerOrExit would be a better method name. If you print the version, the user will have no clue about what went wrong. I would rather print an error message that makes sense, something like "Error while validating the bootstrap address".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1282957101


##########
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.util.Arrays;
+
+public class ToolsUtils {
+    public static void validatePortOrExit(OptionParser parser, String hostPort) {

Review Comment:
   Yes, I think that's a reasonable approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

Posted by "ruslankrivoshein (via GitHub)" <gi...@apache.org>.
ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1634949921

   Well, that's it:
   ```
   $ bin/kafka-get-offsets.sh --broker-list :9092 --topic my-top --time 3
   WARNING: The 'broker-list' option is deprecated and will be removed in the next major release. Use the `bootstrap-server` option with the same syntax.
   my-top:0:0
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org